Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * conflict.c
3 : * Support routines for logging conflicts.
4 : *
5 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/conflict.c
9 : *
10 : * This file contains the code for logging conflicts on the subscriber during
11 : * logical replication.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/tableam.h"
19 : #include "executor/executor.h"
20 : #include "pgstat.h"
21 : #include "replication/conflict.h"
22 : #include "replication/worker_internal.h"
23 : #include "storage/lmgr.h"
24 : #include "utils/lsyscache.h"
25 :
26 : static const char *const ConflictTypeNames[] = {
27 : [CT_INSERT_EXISTS] = "insert_exists",
28 : [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 : [CT_UPDATE_EXISTS] = "update_exists",
30 : [CT_UPDATE_MISSING] = "update_missing",
31 : [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 : [CT_DELETE_MISSING] = "delete_missing",
33 : [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
34 : };
35 :
36 : static int errcode_apply_conflict(ConflictType type);
37 : static void errdetail_apply_conflict(EState *estate,
38 : ResultRelInfo *relinfo,
39 : ConflictType type,
40 : TupleTableSlot *searchslot,
41 : TupleTableSlot *localslot,
42 : TupleTableSlot *remoteslot,
43 : Oid indexoid, TransactionId localxmin,
44 : RepOriginId localorigin,
45 : TimestampTz localts, StringInfo err_msg);
46 : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
47 : ConflictType type,
48 : TupleTableSlot *searchslot,
49 : TupleTableSlot *localslot,
50 : TupleTableSlot *remoteslot,
51 : Oid indexoid);
52 : static char *build_index_value_desc(EState *estate, Relation localrel,
53 : TupleTableSlot *slot, Oid indexoid);
54 :
55 : /*
56 : * Get the xmin and commit timestamp data (origin and timestamp) associated
57 : * with the provided local tuple.
58 : *
59 : * Return true if the commit timestamp data was found, false otherwise.
60 : */
61 : bool
62 144496 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
63 : RepOriginId *localorigin, TimestampTz *localts)
64 : {
65 : Datum xminDatum;
66 : bool isnull;
67 :
68 144496 : xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
69 : &isnull);
70 144496 : *xmin = DatumGetTransactionId(xminDatum);
71 : Assert(!isnull);
72 :
73 : /*
74 : * The commit timestamp data is not available if track_commit_timestamp is
75 : * disabled.
76 : */
77 144496 : if (!track_commit_timestamp)
78 : {
79 144478 : *localorigin = InvalidRepOriginId;
80 144478 : *localts = 0;
81 144478 : return false;
82 : }
83 :
84 18 : return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
85 : }
86 :
87 : /*
88 : * This function is used to report a conflict while applying replication
89 : * changes.
90 : *
91 : * 'searchslot' should contain the tuple used to search the local tuple to be
92 : * updated or deleted.
93 : *
94 : * 'remoteslot' should contain the remote new tuple, if any.
95 : *
96 : * conflicttuples is a list of local tuples that caused the conflict and the
97 : * conflict related information. See ConflictTupleInfo.
98 : *
99 : * The caller must ensure that all the indexes passed in ConflictTupleInfo are
100 : * locked so that we can fetch and display the conflicting key values.
101 : */
102 : void
103 60 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
104 : ConflictType type, TupleTableSlot *searchslot,
105 : TupleTableSlot *remoteslot, List *conflicttuples)
106 : {
107 60 : Relation localrel = relinfo->ri_RelationDesc;
108 : StringInfoData err_detail;
109 :
110 60 : initStringInfo(&err_detail);
111 :
112 : /* Form errdetail message by combining conflicting tuples information. */
113 190 : foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
114 70 : errdetail_apply_conflict(estate, relinfo, type, searchslot,
115 : conflicttuple->slot, remoteslot,
116 : conflicttuple->indexoid,
117 : conflicttuple->xmin,
118 70 : conflicttuple->origin,
119 : conflicttuple->ts,
120 : &err_detail);
121 :
122 60 : pgstat_report_subscription_conflict(MySubscription->oid, type);
123 :
124 60 : ereport(elevel,
125 : errcode_apply_conflict(type),
126 : errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
127 : get_namespace_name(RelationGetNamespace(localrel)),
128 : RelationGetRelationName(localrel),
129 : ConflictTypeNames[type]),
130 : errdetail_internal("%s", err_detail.data));
131 40 : }
132 :
133 : /*
134 : * Find all unique indexes to check for a conflict and store them into
135 : * ResultRelInfo.
136 : */
137 : void
138 216362 : InitConflictIndexes(ResultRelInfo *relInfo)
139 : {
140 216362 : List *uniqueIndexes = NIL;
141 :
142 392022 : for (int i = 0; i < relInfo->ri_NumIndices; i++)
143 : {
144 175660 : Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
145 :
146 175660 : if (indexRelation == NULL)
147 0 : continue;
148 :
149 : /* Detect conflict only for unique indexes */
150 175660 : if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
151 58 : continue;
152 :
153 : /* Don't support conflict detection for deferrable index */
154 175602 : if (!indexRelation->rd_index->indimmediate)
155 0 : continue;
156 :
157 175602 : uniqueIndexes = lappend_oid(uniqueIndexes,
158 : RelationGetRelid(indexRelation));
159 : }
160 :
161 216362 : relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
162 216362 : }
163 :
164 : /*
165 : * Add SQLSTATE error code to the current conflict report.
166 : */
167 : static int
168 60 : errcode_apply_conflict(ConflictType type)
169 : {
170 60 : switch (type)
171 : {
172 20 : case CT_INSERT_EXISTS:
173 : case CT_UPDATE_EXISTS:
174 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
175 20 : return errcode(ERRCODE_UNIQUE_VIOLATION);
176 40 : case CT_UPDATE_ORIGIN_DIFFERS:
177 : case CT_UPDATE_MISSING:
178 : case CT_DELETE_ORIGIN_DIFFERS:
179 : case CT_DELETE_MISSING:
180 40 : return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
181 : }
182 :
183 : Assert(false);
184 0 : return 0; /* silence compiler warning */
185 : }
186 :
187 : /*
188 : * Add an errdetail() line showing conflict detail.
189 : *
190 : * The DETAIL line comprises of two parts:
191 : * 1. Explanation of the conflict type, including the origin and commit
192 : * timestamp of the existing local tuple.
193 : * 2. Display of conflicting key, existing local tuple, remote new tuple, and
194 : * replica identity columns, if any. The remote old tuple is excluded as its
195 : * information is covered in the replica identity columns.
196 : */
197 : static void
198 70 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
199 : ConflictType type, TupleTableSlot *searchslot,
200 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
201 : Oid indexoid, TransactionId localxmin,
202 : RepOriginId localorigin, TimestampTz localts,
203 : StringInfo err_msg)
204 : {
205 : StringInfoData err_detail;
206 : char *val_desc;
207 : char *origin_name;
208 :
209 70 : initStringInfo(&err_detail);
210 :
211 : /* First, construct a detailed message describing the type of conflict */
212 70 : switch (type)
213 : {
214 30 : case CT_INSERT_EXISTS:
215 : case CT_UPDATE_EXISTS:
216 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
217 : Assert(OidIsValid(indexoid) &&
218 : CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
219 :
220 30 : if (localts)
221 : {
222 6 : if (localorigin == InvalidRepOriginId)
223 0 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
224 : get_rel_name(indexoid),
225 : localxmin, timestamptz_to_str(localts));
226 6 : else if (replorigin_by_oid(localorigin, true, &origin_name))
227 2 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
228 : get_rel_name(indexoid), origin_name,
229 : localxmin, timestamptz_to_str(localts));
230 :
231 : /*
232 : * The origin that modified this row has been removed. This
233 : * can happen if the origin was created by a different apply
234 : * worker and its associated subscription and origin were
235 : * dropped after updating the row, or if the origin was
236 : * manually dropped by the user.
237 : */
238 : else
239 4 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
240 : get_rel_name(indexoid),
241 : localxmin, timestamptz_to_str(localts));
242 : }
243 : else
244 24 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
245 : get_rel_name(indexoid), localxmin);
246 :
247 30 : break;
248 :
249 6 : case CT_UPDATE_ORIGIN_DIFFERS:
250 6 : if (localorigin == InvalidRepOriginId)
251 2 : appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
252 : localxmin, timestamptz_to_str(localts));
253 4 : else if (replorigin_by_oid(localorigin, true, &origin_name))
254 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
255 : origin_name, localxmin, timestamptz_to_str(localts));
256 :
257 : /* The origin that modified this row has been removed. */
258 : else
259 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
260 : localxmin, timestamptz_to_str(localts));
261 :
262 6 : break;
263 :
264 12 : case CT_UPDATE_MISSING:
265 12 : appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
266 12 : break;
267 :
268 4 : case CT_DELETE_ORIGIN_DIFFERS:
269 4 : if (localorigin == InvalidRepOriginId)
270 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
271 : localxmin, timestamptz_to_str(localts));
272 2 : else if (replorigin_by_oid(localorigin, true, &origin_name))
273 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
274 : origin_name, localxmin, timestamptz_to_str(localts));
275 :
276 : /* The origin that modified this row has been removed. */
277 : else
278 0 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
279 : localxmin, timestamptz_to_str(localts));
280 :
281 4 : break;
282 :
283 18 : case CT_DELETE_MISSING:
284 18 : appendStringInfoString(&err_detail, _("Could not find the row to be deleted."));
285 18 : break;
286 : }
287 :
288 70 : Assert(err_detail.len > 0);
289 :
290 70 : val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
291 : localslot, remoteslot, indexoid);
292 :
293 : /*
294 : * Next, append the key values, existing local tuple, remote tuple and
295 : * replica identity columns after the message.
296 : */
297 70 : if (val_desc)
298 70 : appendStringInfo(&err_detail, "\n%s", val_desc);
299 :
300 : /*
301 : * Insert a blank line to visually separate the new detail line from the
302 : * existing ones.
303 : */
304 70 : if (err_msg->len > 0)
305 10 : appendStringInfoChar(err_msg, '\n');
306 :
307 70 : appendStringInfoString(err_msg, err_detail.data);
308 70 : }
309 :
310 : /*
311 : * Helper function to build the additional details for conflicting key,
312 : * existing local tuple, remote tuple, and replica identity columns.
313 : *
314 : * If the return value is NULL, it indicates that the current user lacks
315 : * permissions to view the columns involved.
316 : */
317 : static char *
318 70 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
319 : ConflictType type,
320 : TupleTableSlot *searchslot,
321 : TupleTableSlot *localslot,
322 : TupleTableSlot *remoteslot,
323 : Oid indexoid)
324 : {
325 70 : Relation localrel = relinfo->ri_RelationDesc;
326 70 : Oid relid = RelationGetRelid(localrel);
327 70 : TupleDesc tupdesc = RelationGetDescr(localrel);
328 : StringInfoData tuple_value;
329 70 : char *desc = NULL;
330 :
331 : Assert(searchslot || localslot || remoteslot);
332 :
333 70 : initStringInfo(&tuple_value);
334 :
335 : /*
336 : * Report the conflicting key values in the case of a unique constraint
337 : * violation.
338 : */
339 70 : if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
340 : type == CT_MULTIPLE_UNIQUE_CONFLICTS)
341 : {
342 : Assert(OidIsValid(indexoid) && localslot);
343 :
344 30 : desc = build_index_value_desc(estate, localrel, localslot, indexoid);
345 :
346 30 : if (desc)
347 30 : appendStringInfo(&tuple_value, _("Key %s"), desc);
348 : }
349 :
350 70 : if (localslot)
351 : {
352 : /*
353 : * The 'modifiedCols' only applies to the new tuple, hence we pass
354 : * NULL for the existing local tuple.
355 : */
356 40 : desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
357 : NULL, 64);
358 :
359 40 : if (desc)
360 : {
361 40 : if (tuple_value.len > 0)
362 : {
363 30 : appendStringInfoString(&tuple_value, "; ");
364 30 : appendStringInfo(&tuple_value, _("existing local tuple %s"),
365 : desc);
366 : }
367 : else
368 : {
369 10 : appendStringInfo(&tuple_value, _("Existing local tuple %s"),
370 : desc);
371 : }
372 : }
373 : }
374 :
375 70 : if (remoteslot)
376 : {
377 : Bitmapset *modifiedCols;
378 :
379 : /*
380 : * Although logical replication doesn't maintain the bitmap for the
381 : * columns being inserted, we still use it to create 'modifiedCols'
382 : * for consistency with other calls to ExecBuildSlotValueDescription.
383 : *
384 : * Note that generated columns are formed locally on the subscriber.
385 : */
386 48 : modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
387 48 : ExecGetUpdatedCols(relinfo, estate));
388 48 : desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
389 : modifiedCols, 64);
390 :
391 48 : if (desc)
392 : {
393 48 : if (tuple_value.len > 0)
394 : {
395 36 : appendStringInfoString(&tuple_value, "; ");
396 36 : appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
397 : }
398 : else
399 : {
400 12 : appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
401 : }
402 : }
403 : }
404 :
405 70 : if (searchslot)
406 : {
407 : /*
408 : * Note that while index other than replica identity may be used (see
409 : * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
410 : * when applying update or delete, such an index scan may not result
411 : * in a unique tuple and we still compare the complete tuple in such
412 : * cases, thus such indexes are not used here.
413 : */
414 48 : Oid replica_index = GetRelationIdentityOrPK(localrel);
415 :
416 : Assert(type != CT_INSERT_EXISTS);
417 :
418 : /*
419 : * If the table has a valid replica identity index, build the index
420 : * key value string. Otherwise, construct the full tuple value for
421 : * REPLICA IDENTITY FULL cases.
422 : */
423 48 : if (OidIsValid(replica_index))
424 44 : desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
425 : else
426 4 : desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
427 :
428 48 : if (desc)
429 : {
430 48 : if (tuple_value.len > 0)
431 : {
432 30 : appendStringInfoString(&tuple_value, "; ");
433 60 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
434 26 : ? _("replica identity %s")
435 4 : : _("replica identity full %s"), desc);
436 : }
437 : else
438 : {
439 36 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
440 18 : ? _("Replica identity %s")
441 0 : : _("Replica identity full %s"), desc);
442 : }
443 : }
444 : }
445 :
446 70 : if (tuple_value.len == 0)
447 0 : return NULL;
448 :
449 70 : appendStringInfoChar(&tuple_value, '.');
450 70 : return tuple_value.data;
451 : }
452 :
453 : /*
454 : * Helper functions to construct a string describing the contents of an index
455 : * entry. See BuildIndexValueDescription for details.
456 : *
457 : * The caller must ensure that the index with the OID 'indexoid' is locked so
458 : * that we can fetch and display the conflicting key value.
459 : */
460 : static char *
461 74 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
462 : Oid indexoid)
463 : {
464 : char *index_value;
465 : Relation indexDesc;
466 : Datum values[INDEX_MAX_KEYS];
467 : bool isnull[INDEX_MAX_KEYS];
468 74 : TupleTableSlot *tableslot = slot;
469 :
470 74 : if (!tableslot)
471 0 : return NULL;
472 :
473 : Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
474 :
475 74 : indexDesc = index_open(indexoid, NoLock);
476 :
477 : /*
478 : * If the slot is a virtual slot, copy it into a heap tuple slot as
479 : * FormIndexDatum only works with heap tuple slots.
480 : */
481 74 : if (TTS_IS_VIRTUAL(slot))
482 : {
483 26 : tableslot = table_slot_create(localrel, &estate->es_tupleTable);
484 26 : tableslot = ExecCopySlot(tableslot, slot);
485 : }
486 :
487 : /*
488 : * Initialize ecxt_scantuple for potential use in FormIndexDatum when
489 : * index expressions are present.
490 : */
491 74 : GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
492 :
493 : /*
494 : * The values/nulls arrays passed to BuildIndexValueDescription should be
495 : * the results of FormIndexDatum, which are the "raw" input to the index
496 : * AM.
497 : */
498 74 : FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
499 :
500 74 : index_value = BuildIndexValueDescription(indexDesc, values, isnull);
501 :
502 74 : index_close(indexDesc, NoLock);
503 :
504 74 : return index_value;
505 : }
|