LCOV - code coverage report
Current view: top level - src/backend/replication/logical - conflict.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 126 134 94.0 %
Date: 2025-04-19 19:15:24 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * conflict.c
       3             :  *     Support routines for logging conflicts.
       4             :  *
       5             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/conflict.c
       9             :  *
      10             :  * This file contains the code for logging conflicts on the subscriber during
      11             :  * logical replication.
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/tableam.h"
      19             : #include "executor/executor.h"
      20             : #include "pgstat.h"
      21             : #include "replication/conflict.h"
      22             : #include "replication/worker_internal.h"
      23             : #include "storage/lmgr.h"
      24             : #include "utils/lsyscache.h"
      25             : 
      26             : static const char *const ConflictTypeNames[] = {
      27             :     [CT_INSERT_EXISTS] = "insert_exists",
      28             :     [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
      29             :     [CT_UPDATE_EXISTS] = "update_exists",
      30             :     [CT_UPDATE_MISSING] = "update_missing",
      31             :     [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
      32             :     [CT_DELETE_MISSING] = "delete_missing",
      33             :     [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
      34             : };
      35             : 
      36             : static int  errcode_apply_conflict(ConflictType type);
      37             : static void errdetail_apply_conflict(EState *estate,
      38             :                                      ResultRelInfo *relinfo,
      39             :                                      ConflictType type,
      40             :                                      TupleTableSlot *searchslot,
      41             :                                      TupleTableSlot *localslot,
      42             :                                      TupleTableSlot *remoteslot,
      43             :                                      Oid indexoid, TransactionId localxmin,
      44             :                                      RepOriginId localorigin,
      45             :                                      TimestampTz localts, StringInfo err_msg);
      46             : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
      47             :                                        ConflictType type,
      48             :                                        TupleTableSlot *searchslot,
      49             :                                        TupleTableSlot *localslot,
      50             :                                        TupleTableSlot *remoteslot,
      51             :                                        Oid indexoid);
      52             : static char *build_index_value_desc(EState *estate, Relation localrel,
      53             :                                     TupleTableSlot *slot, Oid indexoid);
      54             : 
      55             : /*
      56             :  * Get the xmin and commit timestamp data (origin and timestamp) associated
      57             :  * with the provided local tuple.
      58             :  *
      59             :  * Return true if the commit timestamp data was found, false otherwise.
      60             :  */
      61             : bool
      62      144496 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
      63             :                         RepOriginId *localorigin, TimestampTz *localts)
      64             : {
      65             :     Datum       xminDatum;
      66             :     bool        isnull;
      67             : 
      68      144496 :     xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
      69             :                                 &isnull);
      70      144496 :     *xmin = DatumGetTransactionId(xminDatum);
      71             :     Assert(!isnull);
      72             : 
      73             :     /*
      74             :      * The commit timestamp data is not available if track_commit_timestamp is
      75             :      * disabled.
      76             :      */
      77      144496 :     if (!track_commit_timestamp)
      78             :     {
      79      144478 :         *localorigin = InvalidRepOriginId;
      80      144478 :         *localts = 0;
      81      144478 :         return false;
      82             :     }
      83             : 
      84          18 :     return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
      85             : }
      86             : 
      87             : /*
      88             :  * This function is used to report a conflict while applying replication
      89             :  * changes.
      90             :  *
      91             :  * 'searchslot' should contain the tuple used to search the local tuple to be
      92             :  * updated or deleted.
      93             :  *
      94             :  * 'remoteslot' should contain the remote new tuple, if any.
      95             :  *
      96             :  * conflicttuples is a list of local tuples that caused the conflict and the
      97             :  * conflict related information. See ConflictTupleInfo.
      98             :  *
      99             :  * The caller must ensure that all the indexes passed in ConflictTupleInfo are
     100             :  * locked so that we can fetch and display the conflicting key values.
     101             :  */
     102             : void
     103          60 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
     104             :                     ConflictType type, TupleTableSlot *searchslot,
     105             :                     TupleTableSlot *remoteslot, List *conflicttuples)
     106             : {
     107          60 :     Relation    localrel = relinfo->ri_RelationDesc;
     108             :     StringInfoData err_detail;
     109             : 
     110          60 :     initStringInfo(&err_detail);
     111             : 
     112             :     /* Form errdetail message by combining conflicting tuples information. */
     113         190 :     foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
     114          70 :         errdetail_apply_conflict(estate, relinfo, type, searchslot,
     115             :                                  conflicttuple->slot, remoteslot,
     116             :                                  conflicttuple->indexoid,
     117             :                                  conflicttuple->xmin,
     118          70 :                                  conflicttuple->origin,
     119             :                                  conflicttuple->ts,
     120             :                                  &err_detail);
     121             : 
     122          60 :     pgstat_report_subscription_conflict(MySubscription->oid, type);
     123             : 
     124          60 :     ereport(elevel,
     125             :             errcode_apply_conflict(type),
     126             :             errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
     127             :                    get_namespace_name(RelationGetNamespace(localrel)),
     128             :                    RelationGetRelationName(localrel),
     129             :                    ConflictTypeNames[type]),
     130             :             errdetail_internal("%s", err_detail.data));
     131          40 : }
     132             : 
     133             : /*
     134             :  * Find all unique indexes to check for a conflict and store them into
     135             :  * ResultRelInfo.
     136             :  */
     137             : void
     138      216362 : InitConflictIndexes(ResultRelInfo *relInfo)
     139             : {
     140      216362 :     List       *uniqueIndexes = NIL;
     141             : 
     142      392022 :     for (int i = 0; i < relInfo->ri_NumIndices; i++)
     143             :     {
     144      175660 :         Relation    indexRelation = relInfo->ri_IndexRelationDescs[i];
     145             : 
     146      175660 :         if (indexRelation == NULL)
     147           0 :             continue;
     148             : 
     149             :         /* Detect conflict only for unique indexes */
     150      175660 :         if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
     151          58 :             continue;
     152             : 
     153             :         /* Don't support conflict detection for deferrable index */
     154      175602 :         if (!indexRelation->rd_index->indimmediate)
     155           0 :             continue;
     156             : 
     157      175602 :         uniqueIndexes = lappend_oid(uniqueIndexes,
     158             :                                     RelationGetRelid(indexRelation));
     159             :     }
     160             : 
     161      216362 :     relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
     162      216362 : }
     163             : 
     164             : /*
     165             :  * Add SQLSTATE error code to the current conflict report.
     166             :  */
     167             : static int
     168          60 : errcode_apply_conflict(ConflictType type)
     169             : {
     170          60 :     switch (type)
     171             :     {
     172          20 :         case CT_INSERT_EXISTS:
     173             :         case CT_UPDATE_EXISTS:
     174             :         case CT_MULTIPLE_UNIQUE_CONFLICTS:
     175          20 :             return errcode(ERRCODE_UNIQUE_VIOLATION);
     176          40 :         case CT_UPDATE_ORIGIN_DIFFERS:
     177             :         case CT_UPDATE_MISSING:
     178             :         case CT_DELETE_ORIGIN_DIFFERS:
     179             :         case CT_DELETE_MISSING:
     180          40 :             return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
     181             :     }
     182             : 
     183             :     Assert(false);
     184           0 :     return 0;                   /* silence compiler warning */
     185             : }
     186             : 
     187             : /*
     188             :  * Add an errdetail() line showing conflict detail.
     189             :  *
     190             :  * The DETAIL line comprises of two parts:
     191             :  * 1. Explanation of the conflict type, including the origin and commit
     192             :  *    timestamp of the existing local tuple.
     193             :  * 2. Display of conflicting key, existing local tuple, remote new tuple, and
     194             :  *    replica identity columns, if any. The remote old tuple is excluded as its
     195             :  *    information is covered in the replica identity columns.
     196             :  */
     197             : static void
     198          70 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
     199             :                          ConflictType type, TupleTableSlot *searchslot,
     200             :                          TupleTableSlot *localslot, TupleTableSlot *remoteslot,
     201             :                          Oid indexoid, TransactionId localxmin,
     202             :                          RepOriginId localorigin, TimestampTz localts,
     203             :                          StringInfo err_msg)
     204             : {
     205             :     StringInfoData err_detail;
     206             :     char       *val_desc;
     207             :     char       *origin_name;
     208             : 
     209          70 :     initStringInfo(&err_detail);
     210             : 
     211             :     /* First, construct a detailed message describing the type of conflict */
     212          70 :     switch (type)
     213             :     {
     214          30 :         case CT_INSERT_EXISTS:
     215             :         case CT_UPDATE_EXISTS:
     216             :         case CT_MULTIPLE_UNIQUE_CONFLICTS:
     217             :             Assert(OidIsValid(indexoid) &&
     218             :                    CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     219             : 
     220          30 :             if (localts)
     221             :             {
     222           6 :                 if (localorigin == InvalidRepOriginId)
     223           0 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
     224             :                                      get_rel_name(indexoid),
     225             :                                      localxmin, timestamptz_to_str(localts));
     226           6 :                 else if (replorigin_by_oid(localorigin, true, &origin_name))
     227           2 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
     228             :                                      get_rel_name(indexoid), origin_name,
     229             :                                      localxmin, timestamptz_to_str(localts));
     230             : 
     231             :                 /*
     232             :                  * The origin that modified this row has been removed. This
     233             :                  * can happen if the origin was created by a different apply
     234             :                  * worker and its associated subscription and origin were
     235             :                  * dropped after updating the row, or if the origin was
     236             :                  * manually dropped by the user.
     237             :                  */
     238             :                 else
     239           4 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
     240             :                                      get_rel_name(indexoid),
     241             :                                      localxmin, timestamptz_to_str(localts));
     242             :             }
     243             :             else
     244          24 :                 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
     245             :                                  get_rel_name(indexoid), localxmin);
     246             : 
     247          30 :             break;
     248             : 
     249           6 :         case CT_UPDATE_ORIGIN_DIFFERS:
     250           6 :             if (localorigin == InvalidRepOriginId)
     251           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
     252             :                                  localxmin, timestamptz_to_str(localts));
     253           4 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     254           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     255             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     256             : 
     257             :             /* The origin that modified this row has been removed. */
     258             :             else
     259           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
     260             :                                  localxmin, timestamptz_to_str(localts));
     261             : 
     262           6 :             break;
     263             : 
     264          12 :         case CT_UPDATE_MISSING:
     265          12 :             appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
     266          12 :             break;
     267             : 
     268           4 :         case CT_DELETE_ORIGIN_DIFFERS:
     269           4 :             if (localorigin == InvalidRepOriginId)
     270           2 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
     271             :                                  localxmin, timestamptz_to_str(localts));
     272           2 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     273           2 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     274             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     275             : 
     276             :             /* The origin that modified this row has been removed. */
     277             :             else
     278           0 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
     279             :                                  localxmin, timestamptz_to_str(localts));
     280             : 
     281           4 :             break;
     282             : 
     283          18 :         case CT_DELETE_MISSING:
     284          18 :             appendStringInfoString(&err_detail, _("Could not find the row to be deleted."));
     285          18 :             break;
     286             :     }
     287             : 
     288          70 :     Assert(err_detail.len > 0);
     289             : 
     290          70 :     val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
     291             :                                          localslot, remoteslot, indexoid);
     292             : 
     293             :     /*
     294             :      * Next, append the key values, existing local tuple, remote tuple and
     295             :      * replica identity columns after the message.
     296             :      */
     297          70 :     if (val_desc)
     298          70 :         appendStringInfo(&err_detail, "\n%s", val_desc);
     299             : 
     300             :     /*
     301             :      * Insert a blank line to visually separate the new detail line from the
     302             :      * existing ones.
     303             :      */
     304          70 :     if (err_msg->len > 0)
     305          10 :         appendStringInfoChar(err_msg, '\n');
     306             : 
     307          70 :     appendStringInfoString(err_msg, err_detail.data);
     308          70 : }
     309             : 
     310             : /*
     311             :  * Helper function to build the additional details for conflicting key,
     312             :  * existing local tuple, remote tuple, and replica identity columns.
     313             :  *
     314             :  * If the return value is NULL, it indicates that the current user lacks
     315             :  * permissions to view the columns involved.
     316             :  */
     317             : static char *
     318          70 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
     319             :                           ConflictType type,
     320             :                           TupleTableSlot *searchslot,
     321             :                           TupleTableSlot *localslot,
     322             :                           TupleTableSlot *remoteslot,
     323             :                           Oid indexoid)
     324             : {
     325          70 :     Relation    localrel = relinfo->ri_RelationDesc;
     326          70 :     Oid         relid = RelationGetRelid(localrel);
     327          70 :     TupleDesc   tupdesc = RelationGetDescr(localrel);
     328             :     StringInfoData tuple_value;
     329          70 :     char       *desc = NULL;
     330             : 
     331             :     Assert(searchslot || localslot || remoteslot);
     332             : 
     333          70 :     initStringInfo(&tuple_value);
     334             : 
     335             :     /*
     336             :      * Report the conflicting key values in the case of a unique constraint
     337             :      * violation.
     338             :      */
     339          70 :     if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
     340             :         type == CT_MULTIPLE_UNIQUE_CONFLICTS)
     341             :     {
     342             :         Assert(OidIsValid(indexoid) && localslot);
     343             : 
     344          30 :         desc = build_index_value_desc(estate, localrel, localslot, indexoid);
     345             : 
     346          30 :         if (desc)
     347          30 :             appendStringInfo(&tuple_value, _("Key %s"), desc);
     348             :     }
     349             : 
     350          70 :     if (localslot)
     351             :     {
     352             :         /*
     353             :          * The 'modifiedCols' only applies to the new tuple, hence we pass
     354             :          * NULL for the existing local tuple.
     355             :          */
     356          40 :         desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
     357             :                                              NULL, 64);
     358             : 
     359          40 :         if (desc)
     360             :         {
     361          40 :             if (tuple_value.len > 0)
     362             :             {
     363          30 :                 appendStringInfoString(&tuple_value, "; ");
     364          30 :                 appendStringInfo(&tuple_value, _("existing local tuple %s"),
     365             :                                  desc);
     366             :             }
     367             :             else
     368             :             {
     369          10 :                 appendStringInfo(&tuple_value, _("Existing local tuple %s"),
     370             :                                  desc);
     371             :             }
     372             :         }
     373             :     }
     374             : 
     375          70 :     if (remoteslot)
     376             :     {
     377             :         Bitmapset  *modifiedCols;
     378             : 
     379             :         /*
     380             :          * Although logical replication doesn't maintain the bitmap for the
     381             :          * columns being inserted, we still use it to create 'modifiedCols'
     382             :          * for consistency with other calls to ExecBuildSlotValueDescription.
     383             :          *
     384             :          * Note that generated columns are formed locally on the subscriber.
     385             :          */
     386          48 :         modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
     387          48 :                                  ExecGetUpdatedCols(relinfo, estate));
     388          48 :         desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
     389             :                                              modifiedCols, 64);
     390             : 
     391          48 :         if (desc)
     392             :         {
     393          48 :             if (tuple_value.len > 0)
     394             :             {
     395          36 :                 appendStringInfoString(&tuple_value, "; ");
     396          36 :                 appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
     397             :             }
     398             :             else
     399             :             {
     400          12 :                 appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
     401             :             }
     402             :         }
     403             :     }
     404             : 
     405          70 :     if (searchslot)
     406             :     {
     407             :         /*
     408             :          * Note that while index other than replica identity may be used (see
     409             :          * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
     410             :          * when applying update or delete, such an index scan may not result
     411             :          * in a unique tuple and we still compare the complete tuple in such
     412             :          * cases, thus such indexes are not used here.
     413             :          */
     414          48 :         Oid         replica_index = GetRelationIdentityOrPK(localrel);
     415             : 
     416             :         Assert(type != CT_INSERT_EXISTS);
     417             : 
     418             :         /*
     419             :          * If the table has a valid replica identity index, build the index
     420             :          * key value string. Otherwise, construct the full tuple value for
     421             :          * REPLICA IDENTITY FULL cases.
     422             :          */
     423          48 :         if (OidIsValid(replica_index))
     424          44 :             desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
     425             :         else
     426           4 :             desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
     427             : 
     428          48 :         if (desc)
     429             :         {
     430          48 :             if (tuple_value.len > 0)
     431             :             {
     432          30 :                 appendStringInfoString(&tuple_value, "; ");
     433          60 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     434          26 :                                  ? _("replica identity %s")
     435           4 :                                  : _("replica identity full %s"), desc);
     436             :             }
     437             :             else
     438             :             {
     439          36 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     440          18 :                                  ? _("Replica identity %s")
     441           0 :                                  : _("Replica identity full %s"), desc);
     442             :             }
     443             :         }
     444             :     }
     445             : 
     446          70 :     if (tuple_value.len == 0)
     447           0 :         return NULL;
     448             : 
     449          70 :     appendStringInfoChar(&tuple_value, '.');
     450          70 :     return tuple_value.data;
     451             : }
     452             : 
     453             : /*
     454             :  * Helper functions to construct a string describing the contents of an index
     455             :  * entry. See BuildIndexValueDescription for details.
     456             :  *
     457             :  * The caller must ensure that the index with the OID 'indexoid' is locked so
     458             :  * that we can fetch and display the conflicting key value.
     459             :  */
     460             : static char *
     461          74 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
     462             :                        Oid indexoid)
     463             : {
     464             :     char       *index_value;
     465             :     Relation    indexDesc;
     466             :     Datum       values[INDEX_MAX_KEYS];
     467             :     bool        isnull[INDEX_MAX_KEYS];
     468          74 :     TupleTableSlot *tableslot = slot;
     469             : 
     470          74 :     if (!tableslot)
     471           0 :         return NULL;
     472             : 
     473             :     Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     474             : 
     475          74 :     indexDesc = index_open(indexoid, NoLock);
     476             : 
     477             :     /*
     478             :      * If the slot is a virtual slot, copy it into a heap tuple slot as
     479             :      * FormIndexDatum only works with heap tuple slots.
     480             :      */
     481          74 :     if (TTS_IS_VIRTUAL(slot))
     482             :     {
     483          26 :         tableslot = table_slot_create(localrel, &estate->es_tupleTable);
     484          26 :         tableslot = ExecCopySlot(tableslot, slot);
     485             :     }
     486             : 
     487             :     /*
     488             :      * Initialize ecxt_scantuple for potential use in FormIndexDatum when
     489             :      * index expressions are present.
     490             :      */
     491          74 :     GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
     492             : 
     493             :     /*
     494             :      * The values/nulls arrays passed to BuildIndexValueDescription should be
     495             :      * the results of FormIndexDatum, which are the "raw" input to the index
     496             :      * AM.
     497             :      */
     498          74 :     FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
     499             : 
     500          74 :     index_value = BuildIndexValueDescription(indexDesc, values, isnull);
     501             : 
     502          74 :     index_close(indexDesc, NoLock);
     503             : 
     504          74 :     return index_value;
     505             : }

Generated by: LCOV version 1.14