LCOV - code coverage report
Current view: top level - src/backend/replication/logical - conflict.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 130 142 91.5 %
Date: 2025-09-10 21:18:40 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * conflict.c
       3             :  *     Support routines for logging conflicts.
       4             :  *
       5             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/conflict.c
       9             :  *
      10             :  * This file contains the code for logging conflicts on the subscriber during
      11             :  * logical replication.
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/tableam.h"
      19             : #include "executor/executor.h"
      20             : #include "pgstat.h"
      21             : #include "replication/conflict.h"
      22             : #include "replication/worker_internal.h"
      23             : #include "storage/lmgr.h"
      24             : #include "utils/lsyscache.h"
      25             : 
      26             : static const char *const ConflictTypeNames[] = {
      27             :     [CT_INSERT_EXISTS] = "insert_exists",
      28             :     [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
      29             :     [CT_UPDATE_EXISTS] = "update_exists",
      30             :     [CT_UPDATE_MISSING] = "update_missing",
      31             :     [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
      32             :     [CT_UPDATE_DELETED] = "update_deleted",
      33             :     [CT_DELETE_MISSING] = "delete_missing",
      34             :     [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
      35             : };
      36             : 
      37             : static int  errcode_apply_conflict(ConflictType type);
      38             : static void errdetail_apply_conflict(EState *estate,
      39             :                                      ResultRelInfo *relinfo,
      40             :                                      ConflictType type,
      41             :                                      TupleTableSlot *searchslot,
      42             :                                      TupleTableSlot *localslot,
      43             :                                      TupleTableSlot *remoteslot,
      44             :                                      Oid indexoid, TransactionId localxmin,
      45             :                                      RepOriginId localorigin,
      46             :                                      TimestampTz localts, StringInfo err_msg);
      47             : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
      48             :                                        ConflictType type,
      49             :                                        TupleTableSlot *searchslot,
      50             :                                        TupleTableSlot *localslot,
      51             :                                        TupleTableSlot *remoteslot,
      52             :                                        Oid indexoid);
      53             : static char *build_index_value_desc(EState *estate, Relation localrel,
      54             :                                     TupleTableSlot *slot, Oid indexoid);
      55             : 
      56             : /*
      57             :  * Get the xmin and commit timestamp data (origin and timestamp) associated
      58             :  * with the provided local row.
      59             :  *
      60             :  * Return true if the commit timestamp data was found, false otherwise.
      61             :  */
      62             : bool
      63      144592 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
      64             :                         RepOriginId *localorigin, TimestampTz *localts)
      65             : {
      66             :     Datum       xminDatum;
      67             :     bool        isnull;
      68             : 
      69      144592 :     xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
      70             :                                 &isnull);
      71      144592 :     *xmin = DatumGetTransactionId(xminDatum);
      72             :     Assert(!isnull);
      73             : 
      74             :     /*
      75             :      * The commit timestamp data is not available if track_commit_timestamp is
      76             :      * disabled.
      77             :      */
      78      144592 :     if (!track_commit_timestamp)
      79             :     {
      80      144484 :         *localorigin = InvalidRepOriginId;
      81      144484 :         *localts = 0;
      82      144484 :         return false;
      83             :     }
      84             : 
      85         108 :     return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
      86             : }
      87             : 
      88             : /*
      89             :  * This function is used to report a conflict while applying replication
      90             :  * changes.
      91             :  *
      92             :  * 'searchslot' should contain the tuple used to search the local row to be
      93             :  * updated or deleted.
      94             :  *
      95             :  * 'remoteslot' should contain the remote new tuple, if any.
      96             :  *
      97             :  * conflicttuples is a list of local rows that caused the conflict and the
      98             :  * conflict related information. See ConflictTupleInfo.
      99             :  *
     100             :  * The caller must ensure that all the indexes passed in ConflictTupleInfo are
     101             :  * locked so that we can fetch and display the conflicting key values.
     102             :  */
     103             : void
     104         116 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
     105             :                     ConflictType type, TupleTableSlot *searchslot,
     106             :                     TupleTableSlot *remoteslot, List *conflicttuples)
     107             : {
     108         116 :     Relation    localrel = relinfo->ri_RelationDesc;
     109             :     StringInfoData err_detail;
     110             : 
     111         116 :     initStringInfo(&err_detail);
     112             : 
     113             :     /* Form errdetail message by combining conflicting tuples information. */
     114         404 :     foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
     115         172 :         errdetail_apply_conflict(estate, relinfo, type, searchslot,
     116             :                                  conflicttuple->slot, remoteslot,
     117             :                                  conflicttuple->indexoid,
     118             :                                  conflicttuple->xmin,
     119         172 :                                  conflicttuple->origin,
     120             :                                  conflicttuple->ts,
     121             :                                  &err_detail);
     122             : 
     123         116 :     pgstat_report_subscription_conflict(MySubscription->oid, type);
     124             : 
     125         116 :     ereport(elevel,
     126             :             errcode_apply_conflict(type),
     127             :             errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
     128             :                    get_namespace_name(RelationGetNamespace(localrel)),
     129             :                    RelationGetRelationName(localrel),
     130             :                    ConflictTypeNames[type]),
     131             :             errdetail_internal("%s", err_detail.data));
     132          52 : }
     133             : 
     134             : /*
     135             :  * Find all unique indexes to check for a conflict and store them into
     136             :  * ResultRelInfo.
     137             :  */
     138             : void
     139      215952 : InitConflictIndexes(ResultRelInfo *relInfo)
     140             : {
     141      215952 :     List       *uniqueIndexes = NIL;
     142             : 
     143      391246 :     for (int i = 0; i < relInfo->ri_NumIndices; i++)
     144             :     {
     145      175294 :         Relation    indexRelation = relInfo->ri_IndexRelationDescs[i];
     146             : 
     147      175294 :         if (indexRelation == NULL)
     148           0 :             continue;
     149             : 
     150             :         /* Detect conflict only for unique indexes */
     151      175294 :         if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
     152          58 :             continue;
     153             : 
     154             :         /* Don't support conflict detection for deferrable index */
     155      175236 :         if (!indexRelation->rd_index->indimmediate)
     156           0 :             continue;
     157             : 
     158      175236 :         uniqueIndexes = lappend_oid(uniqueIndexes,
     159             :                                     RelationGetRelid(indexRelation));
     160             :     }
     161             : 
     162      215952 :     relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
     163      215952 : }
     164             : 
     165             : /*
     166             :  * Add SQLSTATE error code to the current conflict report.
     167             :  */
     168             : static int
     169         116 : errcode_apply_conflict(ConflictType type)
     170             : {
     171         116 :     switch (type)
     172             :     {
     173          64 :         case CT_INSERT_EXISTS:
     174             :         case CT_UPDATE_EXISTS:
     175             :         case CT_MULTIPLE_UNIQUE_CONFLICTS:
     176          64 :             return errcode(ERRCODE_UNIQUE_VIOLATION);
     177          52 :         case CT_UPDATE_ORIGIN_DIFFERS:
     178             :         case CT_UPDATE_MISSING:
     179             :         case CT_DELETE_ORIGIN_DIFFERS:
     180             :         case CT_UPDATE_DELETED:
     181             :         case CT_DELETE_MISSING:
     182          52 :             return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
     183             :     }
     184             : 
     185             :     Assert(false);
     186           0 :     return 0;                   /* silence compiler warning */
     187             : }
     188             : 
     189             : /*
     190             :  * Add an errdetail() line showing conflict detail.
     191             :  *
     192             :  * The DETAIL line comprises of two parts:
     193             :  * 1. Explanation of the conflict type, including the origin and commit
     194             :  *    timestamp of the existing local row.
     195             :  * 2. Display of conflicting key, existing local row, remote new row, and
     196             :  *    replica identity columns, if any. The remote old row is excluded as its
     197             :  *    information is covered in the replica identity columns.
     198             :  */
     199             : static void
     200         172 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
     201             :                          ConflictType type, TupleTableSlot *searchslot,
     202             :                          TupleTableSlot *localslot, TupleTableSlot *remoteslot,
     203             :                          Oid indexoid, TransactionId localxmin,
     204             :                          RepOriginId localorigin, TimestampTz localts,
     205             :                          StringInfo err_msg)
     206             : {
     207             :     StringInfoData err_detail;
     208             :     char       *val_desc;
     209             :     char       *origin_name;
     210             : 
     211         172 :     initStringInfo(&err_detail);
     212             : 
     213             :     /* First, construct a detailed message describing the type of conflict */
     214         172 :     switch (type)
     215             :     {
     216         120 :         case CT_INSERT_EXISTS:
     217             :         case CT_UPDATE_EXISTS:
     218             :         case CT_MULTIPLE_UNIQUE_CONFLICTS:
     219             :             Assert(OidIsValid(indexoid) &&
     220             :                    CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     221             : 
     222         120 :             if (localts)
     223             :             {
     224           6 :                 if (localorigin == InvalidRepOriginId)
     225           0 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
     226             :                                      get_rel_name(indexoid),
     227             :                                      localxmin, timestamptz_to_str(localts));
     228           6 :                 else if (replorigin_by_oid(localorigin, true, &origin_name))
     229           2 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
     230             :                                      get_rel_name(indexoid), origin_name,
     231             :                                      localxmin, timestamptz_to_str(localts));
     232             : 
     233             :                 /*
     234             :                  * The origin that modified this row has been removed. This
     235             :                  * can happen if the origin was created by a different apply
     236             :                  * worker and its associated subscription and origin were
     237             :                  * dropped after updating the row, or if the origin was
     238             :                  * manually dropped by the user.
     239             :                  */
     240             :                 else
     241           4 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
     242             :                                      get_rel_name(indexoid),
     243             :                                      localxmin, timestamptz_to_str(localts));
     244             :             }
     245             :             else
     246         114 :                 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
     247             :                                  get_rel_name(indexoid), localxmin);
     248             : 
     249         120 :             break;
     250             : 
     251           6 :         case CT_UPDATE_ORIGIN_DIFFERS:
     252           6 :             if (localorigin == InvalidRepOriginId)
     253           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
     254             :                                  localxmin, timestamptz_to_str(localts));
     255           4 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     256           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     257             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     258             : 
     259             :             /* The origin that modified this row has been removed. */
     260             :             else
     261           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
     262             :                                  localxmin, timestamptz_to_str(localts));
     263             : 
     264           6 :             break;
     265             : 
     266           6 :         case CT_UPDATE_DELETED:
     267           6 :             if (localts)
     268             :             {
     269           6 :                 if (localorigin == InvalidRepOriginId)
     270           6 :                     appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
     271             :                                      localxmin, timestamptz_to_str(localts));
     272           0 :                 else if (replorigin_by_oid(localorigin, true, &origin_name))
     273           0 :                     appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
     274             :                                      origin_name, localxmin, timestamptz_to_str(localts));
     275             : 
     276             :                 /* The origin that modified this row has been removed. */
     277             :                 else
     278           0 :                     appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
     279             :                                      localxmin, timestamptz_to_str(localts));
     280             :             }
     281             :             else
     282           0 :                 appendStringInfo(&err_detail, _("The row to be updated was deleted."));
     283             : 
     284           6 :             break;
     285             : 
     286          12 :         case CT_UPDATE_MISSING:
     287          12 :             appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
     288          12 :             break;
     289             : 
     290          10 :         case CT_DELETE_ORIGIN_DIFFERS:
     291          10 :             if (localorigin == InvalidRepOriginId)
     292           8 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
     293             :                                  localxmin, timestamptz_to_str(localts));
     294           2 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     295           2 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     296             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     297             : 
     298             :             /* The origin that modified this row has been removed. */
     299             :             else
     300           0 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
     301             :                                  localxmin, timestamptz_to_str(localts));
     302             : 
     303          10 :             break;
     304             : 
     305          18 :         case CT_DELETE_MISSING:
     306          18 :             appendStringInfoString(&err_detail, _("Could not find the row to be deleted."));
     307          18 :             break;
     308             :     }
     309             : 
     310             :     Assert(err_detail.len > 0);
     311             : 
     312         172 :     val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
     313             :                                          localslot, remoteslot, indexoid);
     314             : 
     315             :     /*
     316             :      * Next, append the key values, existing local row, remote row, and
     317             :      * replica identity columns after the message.
     318             :      */
     319         172 :     if (val_desc)
     320         172 :         appendStringInfo(&err_detail, "\n%s", val_desc);
     321             : 
     322             :     /*
     323             :      * Insert a blank line to visually separate the new detail line from the
     324             :      * existing ones.
     325             :      */
     326         172 :     if (err_msg->len > 0)
     327          56 :         appendStringInfoChar(err_msg, '\n');
     328             : 
     329         172 :     appendStringInfoString(err_msg, err_detail.data);
     330         172 : }
     331             : 
     332             : /*
     333             :  * Helper function to build the additional details for conflicting key,
     334             :  * existing local row, remote row, and replica identity columns.
     335             :  *
     336             :  * If the return value is NULL, it indicates that the current user lacks
     337             :  * permissions to view the columns involved.
     338             :  */
     339             : static char *
     340         172 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
     341             :                           ConflictType type,
     342             :                           TupleTableSlot *searchslot,
     343             :                           TupleTableSlot *localslot,
     344             :                           TupleTableSlot *remoteslot,
     345             :                           Oid indexoid)
     346             : {
     347         172 :     Relation    localrel = relinfo->ri_RelationDesc;
     348         172 :     Oid         relid = RelationGetRelid(localrel);
     349         172 :     TupleDesc   tupdesc = RelationGetDescr(localrel);
     350             :     StringInfoData tuple_value;
     351         172 :     char       *desc = NULL;
     352             : 
     353             :     Assert(searchslot || localslot || remoteslot);
     354             : 
     355         172 :     initStringInfo(&tuple_value);
     356             : 
     357             :     /*
     358             :      * Report the conflicting key values in the case of a unique constraint
     359             :      * violation.
     360             :      */
     361         172 :     if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
     362             :         type == CT_MULTIPLE_UNIQUE_CONFLICTS)
     363             :     {
     364             :         Assert(OidIsValid(indexoid) && localslot);
     365             : 
     366         120 :         desc = build_index_value_desc(estate, localrel, localslot, indexoid);
     367             : 
     368         120 :         if (desc)
     369         120 :             appendStringInfo(&tuple_value, _("Key %s"), desc);
     370             :     }
     371             : 
     372         172 :     if (localslot)
     373             :     {
     374             :         /*
     375             :          * The 'modifiedCols' only applies to the new tuple, hence we pass
     376             :          * NULL for the existing local row.
     377             :          */
     378         136 :         desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
     379             :                                              NULL, 64);
     380             : 
     381         136 :         if (desc)
     382             :         {
     383         136 :             if (tuple_value.len > 0)
     384             :             {
     385         120 :                 appendStringInfoString(&tuple_value, "; ");
     386         120 :                 appendStringInfo(&tuple_value, _("existing local row %s"),
     387             :                                  desc);
     388             :             }
     389             :             else
     390             :             {
     391          16 :                 appendStringInfo(&tuple_value, _("Existing local row %s"),
     392             :                                  desc);
     393             :             }
     394             :         }
     395             :     }
     396             : 
     397         172 :     if (remoteslot)
     398             :     {
     399             :         Bitmapset  *modifiedCols;
     400             : 
     401             :         /*
     402             :          * Although logical replication doesn't maintain the bitmap for the
     403             :          * columns being inserted, we still use it to create 'modifiedCols'
     404             :          * for consistency with other calls to ExecBuildSlotValueDescription.
     405             :          *
     406             :          * Note that generated columns are formed locally on the subscriber.
     407             :          */
     408         144 :         modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
     409         144 :                                  ExecGetUpdatedCols(relinfo, estate));
     410         144 :         desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
     411             :                                              modifiedCols, 64);
     412             : 
     413         144 :         if (desc)
     414             :         {
     415         144 :             if (tuple_value.len > 0)
     416             :             {
     417         126 :                 appendStringInfoString(&tuple_value, "; ");
     418         126 :                 appendStringInfo(&tuple_value, _("remote row %s"), desc);
     419             :             }
     420             :             else
     421             :             {
     422          18 :                 appendStringInfo(&tuple_value, _("Remote row %s"), desc);
     423             :             }
     424             :         }
     425             :     }
     426             : 
     427         172 :     if (searchslot)
     428             :     {
     429             :         /*
     430             :          * Note that while index other than replica identity may be used (see
     431             :          * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
     432             :          * when applying update or delete, such an index scan may not result
     433             :          * in a unique tuple and we still compare the complete tuple in such
     434             :          * cases, thus such indexes are not used here.
     435             :          */
     436          60 :         Oid         replica_index = GetRelationIdentityOrPK(localrel);
     437             : 
     438             :         Assert(type != CT_INSERT_EXISTS);
     439             : 
     440             :         /*
     441             :          * If the table has a valid replica identity index, build the index
     442             :          * key value string. Otherwise, construct the full tuple value for
     443             :          * REPLICA IDENTITY FULL cases.
     444             :          */
     445          60 :         if (OidIsValid(replica_index))
     446          52 :             desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
     447             :         else
     448           8 :             desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
     449             : 
     450          60 :         if (desc)
     451             :         {
     452          60 :             if (tuple_value.len > 0)
     453             :             {
     454          42 :                 appendStringInfoString(&tuple_value, "; ");
     455          84 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     456          34 :                                  ? _("replica identity %s")
     457           8 :                                  : _("replica identity full %s"), desc);
     458             :             }
     459             :             else
     460             :             {
     461          36 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     462          18 :                                  ? _("Replica identity %s")
     463           0 :                                  : _("Replica identity full %s"), desc);
     464             :             }
     465             :         }
     466             :     }
     467             : 
     468         172 :     if (tuple_value.len == 0)
     469           0 :         return NULL;
     470             : 
     471         172 :     appendStringInfoChar(&tuple_value, '.');
     472         172 :     return tuple_value.data;
     473             : }
     474             : 
     475             : /*
     476             :  * Helper functions to construct a string describing the contents of an index
     477             :  * entry. See BuildIndexValueDescription for details.
     478             :  *
     479             :  * The caller must ensure that the index with the OID 'indexoid' is locked so
     480             :  * that we can fetch and display the conflicting key value.
     481             :  */
     482             : static char *
     483         172 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
     484             :                        Oid indexoid)
     485             : {
     486             :     char       *index_value;
     487             :     Relation    indexDesc;
     488             :     Datum       values[INDEX_MAX_KEYS];
     489             :     bool        isnull[INDEX_MAX_KEYS];
     490         172 :     TupleTableSlot *tableslot = slot;
     491             : 
     492         172 :     if (!tableslot)
     493           0 :         return NULL;
     494             : 
     495             :     Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     496             : 
     497         172 :     indexDesc = index_open(indexoid, NoLock);
     498             : 
     499             :     /*
     500             :      * If the slot is a virtual slot, copy it into a heap tuple slot as
     501             :      * FormIndexDatum only works with heap tuple slots.
     502             :      */
     503         172 :     if (TTS_IS_VIRTUAL(slot))
     504             :     {
     505          34 :         tableslot = table_slot_create(localrel, &estate->es_tupleTable);
     506          34 :         tableslot = ExecCopySlot(tableslot, slot);
     507             :     }
     508             : 
     509             :     /*
     510             :      * Initialize ecxt_scantuple for potential use in FormIndexDatum when
     511             :      * index expressions are present.
     512             :      */
     513         172 :     GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
     514             : 
     515             :     /*
     516             :      * The values/nulls arrays passed to BuildIndexValueDescription should be
     517             :      * the results of FormIndexDatum, which are the "raw" input to the index
     518             :      * AM.
     519             :      */
     520         172 :     FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
     521             : 
     522         172 :     index_value = BuildIndexValueDescription(indexDesc, values, isnull);
     523             : 
     524         172 :     index_close(indexDesc, NoLock);
     525             : 
     526         172 :     return index_value;
     527             : }

Generated by: LCOV version 1.16