Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * conflict.c
3 : * Support routines for logging conflicts.
4 : *
5 : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/conflict.c
9 : *
10 : * This file contains the code for logging conflicts on the subscriber during
11 : * logical replication.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/tableam.h"
19 : #include "executor/executor.h"
20 : #include "pgstat.h"
21 : #include "replication/conflict.h"
22 : #include "replication/worker_internal.h"
23 : #include "storage/lmgr.h"
24 : #include "utils/lsyscache.h"
25 :
26 : static const char *const ConflictTypeNames[] = {
27 : [CT_INSERT_EXISTS] = "insert_exists",
28 : [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 : [CT_UPDATE_EXISTS] = "update_exists",
30 : [CT_UPDATE_MISSING] = "update_missing",
31 : [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 : [CT_UPDATE_DELETED] = "update_deleted",
33 : [CT_DELETE_MISSING] = "delete_missing",
34 : [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
35 : };
36 :
37 : static int errcode_apply_conflict(ConflictType type);
38 : static void errdetail_apply_conflict(EState *estate,
39 : ResultRelInfo *relinfo,
40 : ConflictType type,
41 : TupleTableSlot *searchslot,
42 : TupleTableSlot *localslot,
43 : TupleTableSlot *remoteslot,
44 : Oid indexoid, TransactionId localxmin,
45 : ReplOriginId localorigin,
46 : TimestampTz localts, StringInfo err_msg);
47 : static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
48 : ConflictType type, char **key_desc,
49 : TupleTableSlot *searchslot, char **search_desc,
50 : TupleTableSlot *localslot, char **local_desc,
51 : TupleTableSlot *remoteslot, char **remote_desc,
52 : Oid indexoid);
53 : static char *build_index_value_desc(EState *estate, Relation localrel,
54 : TupleTableSlot *slot, Oid indexoid);
55 :
56 : /*
57 : * Get the xmin and commit timestamp data (origin and timestamp) associated
58 : * with the provided local row.
59 : *
60 : * Return true if the commit timestamp data was found, false otherwise.
61 : */
62 : bool
63 144624 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
64 : ReplOriginId *localorigin, TimestampTz *localts)
65 : {
66 : Datum xminDatum;
67 : bool isnull;
68 :
69 144624 : xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
70 : &isnull);
71 144624 : *xmin = DatumGetTransactionId(xminDatum);
72 : Assert(!isnull);
73 :
74 : /*
75 : * The commit timestamp data is not available if track_commit_timestamp is
76 : * disabled.
77 : */
78 144624 : if (!track_commit_timestamp)
79 : {
80 144484 : *localorigin = InvalidReplOriginId;
81 144484 : *localts = 0;
82 144484 : return false;
83 : }
84 :
85 140 : return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
86 : }
87 :
88 : /*
89 : * This function is used to report a conflict while applying replication
90 : * changes.
91 : *
92 : * 'searchslot' should contain the tuple used to search the local row to be
93 : * updated or deleted.
94 : *
95 : * 'remoteslot' should contain the remote new tuple, if any.
96 : *
97 : * conflicttuples is a list of local rows that caused the conflict and the
98 : * conflict related information. See ConflictTupleInfo.
99 : *
100 : * The caller must ensure that all the indexes passed in ConflictTupleInfo are
101 : * locked so that we can fetch and display the conflicting key values.
102 : */
103 : void
104 132 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
105 : ConflictType type, TupleTableSlot *searchslot,
106 : TupleTableSlot *remoteslot, List *conflicttuples)
107 : {
108 132 : Relation localrel = relinfo->ri_RelationDesc;
109 : StringInfoData err_detail;
110 :
111 132 : initStringInfo(&err_detail);
112 :
113 : /* Form errdetail message by combining conflicting tuples information. */
114 468 : foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
115 204 : errdetail_apply_conflict(estate, relinfo, type, searchslot,
116 : conflicttuple->slot, remoteslot,
117 : conflicttuple->indexoid,
118 : conflicttuple->xmin,
119 204 : conflicttuple->origin,
120 : conflicttuple->ts,
121 : &err_detail);
122 :
123 132 : pgstat_report_subscription_conflict(MySubscription->oid, type);
124 :
125 132 : ereport(elevel,
126 : errcode_apply_conflict(type),
127 : errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
128 : get_namespace_name(RelationGetNamespace(localrel)),
129 : RelationGetRelationName(localrel),
130 : ConflictTypeNames[type]),
131 : errdetail_internal("%s", err_detail.data));
132 52 : }
133 :
134 : /*
135 : * Find all unique indexes to check for a conflict and store them into
136 : * ResultRelInfo.
137 : */
138 : void
139 216278 : InitConflictIndexes(ResultRelInfo *relInfo)
140 : {
141 216278 : List *uniqueIndexes = NIL;
142 :
143 391902 : for (int i = 0; i < relInfo->ri_NumIndices; i++)
144 : {
145 175624 : Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
146 :
147 175624 : if (indexRelation == NULL)
148 0 : continue;
149 :
150 : /* Detect conflict only for unique indexes */
151 175624 : if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
152 58 : continue;
153 :
154 : /* Don't support conflict detection for deferrable index */
155 175566 : if (!indexRelation->rd_index->indimmediate)
156 0 : continue;
157 :
158 175566 : uniqueIndexes = lappend_oid(uniqueIndexes,
159 : RelationGetRelid(indexRelation));
160 : }
161 :
162 216278 : relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
163 216278 : }
164 :
165 : /*
166 : * Add SQLSTATE error code to the current conflict report.
167 : */
168 : static int
169 132 : errcode_apply_conflict(ConflictType type)
170 : {
171 132 : switch (type)
172 : {
173 80 : case CT_INSERT_EXISTS:
174 : case CT_UPDATE_EXISTS:
175 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
176 80 : return errcode(ERRCODE_UNIQUE_VIOLATION);
177 52 : case CT_UPDATE_ORIGIN_DIFFERS:
178 : case CT_UPDATE_MISSING:
179 : case CT_DELETE_ORIGIN_DIFFERS:
180 : case CT_UPDATE_DELETED:
181 : case CT_DELETE_MISSING:
182 52 : return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
183 : }
184 :
185 : Assert(false);
186 0 : return 0; /* silence compiler warning */
187 : }
188 :
189 : /*
190 : * Helper function to build the additional details for conflicting key,
191 : * local row, remote row, and replica identity columns.
192 : */
193 : static void
194 284 : append_tuple_value_detail(StringInfo buf, List *tuple_values,
195 : bool need_newline)
196 : {
197 284 : bool first = true;
198 :
199 : Assert(buf != NULL && tuple_values != NIL);
200 :
201 1134 : foreach_ptr(char, tuple_value, tuple_values)
202 : {
203 : /*
204 : * Skip if the value is NULL. This means the current user does not
205 : * have enough permissions to see all columns in the table. See
206 : * get_tuple_desc().
207 : */
208 566 : if (!tuple_value)
209 86 : continue;
210 :
211 480 : if (first)
212 : {
213 : /*
214 : * translator: The colon is used as a separator in conflict
215 : * messages. The first part, built in the caller, describes what
216 : * happened locally; the second part lists the conflicting keys
217 : * and tuple data.
218 : */
219 284 : appendStringInfoString(buf, _(": "));
220 : }
221 : else
222 : {
223 : /*
224 : * translator: This is a separator in a list of conflicting keys
225 : * and tuple data.
226 : */
227 196 : appendStringInfoString(buf, _(", "));
228 : }
229 :
230 480 : appendStringInfoString(buf, tuple_value);
231 480 : first = false;
232 : }
233 :
234 : /* translator: This is the terminator of a conflict message */
235 284 : appendStringInfoString(buf, _("."));
236 :
237 284 : if (need_newline)
238 86 : appendStringInfoChar(buf, '\n');
239 284 : }
240 :
241 : /*
242 : * Add an errdetail() line showing conflict detail.
243 : *
244 : * The DETAIL line comprises of two parts:
245 : * 1. Explanation of the conflict type, including the origin and commit
246 : * timestamp of the local row.
247 : * 2. Display of conflicting key, local row, remote new row, and replica
248 : * identity columns, if any. The remote old row is excluded as its
249 : * information is covered in the replica identity columns.
250 : */
251 : static void
252 204 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
253 : ConflictType type, TupleTableSlot *searchslot,
254 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
255 : Oid indexoid, TransactionId localxmin,
256 : ReplOriginId localorigin, TimestampTz localts,
257 : StringInfo err_msg)
258 : {
259 : StringInfoData err_detail;
260 : char *origin_name;
261 204 : char *key_desc = NULL;
262 204 : char *local_desc = NULL;
263 204 : char *remote_desc = NULL;
264 204 : char *search_desc = NULL;
265 :
266 : /* Get key, replica identity, remote, and local value data */
267 204 : get_tuple_desc(estate, relinfo, type, &key_desc,
268 : localslot, &local_desc,
269 : remoteslot, &remote_desc,
270 : searchslot, &search_desc,
271 : indexoid);
272 :
273 204 : initStringInfo(&err_detail);
274 :
275 : /* Construct a detailed message describing the type of conflict */
276 204 : switch (type)
277 : {
278 152 : case CT_INSERT_EXISTS:
279 : case CT_UPDATE_EXISTS:
280 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
281 : Assert(OidIsValid(indexoid) &&
282 : CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
283 :
284 152 : if (err_msg->len == 0)
285 : {
286 80 : appendStringInfoString(&err_detail, _("Could not apply remote change"));
287 :
288 80 : append_tuple_value_detail(&err_detail,
289 80 : list_make2(remote_desc, search_desc),
290 : true);
291 : }
292 :
293 152 : if (localts)
294 : {
295 6 : if (localorigin == InvalidReplOriginId)
296 0 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
297 : get_rel_name(indexoid),
298 : localxmin, timestamptz_to_str(localts));
299 6 : else if (replorigin_by_oid(localorigin, true, &origin_name))
300 2 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s"),
301 : get_rel_name(indexoid), origin_name,
302 : localxmin, timestamptz_to_str(localts));
303 :
304 : /*
305 : * The origin that modified this row has been removed. This
306 : * can happen if the origin was created by a different apply
307 : * worker and its associated subscription and origin were
308 : * dropped after updating the row, or if the origin was
309 : * manually dropped by the user.
310 : */
311 : else
312 4 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s"),
313 : get_rel_name(indexoid),
314 : localxmin, timestamptz_to_str(localts));
315 : }
316 : else
317 146 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u"),
318 : get_rel_name(indexoid), localxmin);
319 :
320 152 : append_tuple_value_detail(&err_detail,
321 152 : list_make2(key_desc, local_desc), false);
322 :
323 152 : break;
324 :
325 6 : case CT_UPDATE_ORIGIN_DIFFERS:
326 6 : if (localorigin == InvalidReplOriginId)
327 2 : appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
328 : localxmin, timestamptz_to_str(localts));
329 4 : else if (replorigin_by_oid(localorigin, true, &origin_name))
330 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s"),
331 : origin_name, localxmin, timestamptz_to_str(localts));
332 :
333 : /* The origin that modified this row has been removed. */
334 : else
335 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s"),
336 : localxmin, timestamptz_to_str(localts));
337 :
338 6 : append_tuple_value_detail(&err_detail,
339 6 : list_make3(local_desc, remote_desc,
340 : search_desc), false);
341 :
342 6 : break;
343 :
344 6 : case CT_UPDATE_DELETED:
345 6 : appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
346 :
347 6 : append_tuple_value_detail(&err_detail,
348 6 : list_make2(remote_desc, search_desc),
349 : true);
350 :
351 6 : if (localts)
352 : {
353 6 : if (localorigin == InvalidReplOriginId)
354 6 : appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
355 : localxmin, timestamptz_to_str(localts));
356 0 : else if (replorigin_by_oid(localorigin, true, &origin_name))
357 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s"),
358 : origin_name, localxmin, timestamptz_to_str(localts));
359 :
360 : /* The origin that modified this row has been removed. */
361 : else
362 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s"),
363 : localxmin, timestamptz_to_str(localts));
364 : }
365 : else
366 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted"));
367 :
368 6 : break;
369 :
370 12 : case CT_UPDATE_MISSING:
371 12 : appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
372 :
373 12 : append_tuple_value_detail(&err_detail,
374 12 : list_make2(remote_desc, search_desc),
375 : false);
376 :
377 12 : break;
378 :
379 10 : case CT_DELETE_ORIGIN_DIFFERS:
380 10 : if (localorigin == InvalidReplOriginId)
381 8 : appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
382 : localxmin, timestamptz_to_str(localts));
383 2 : else if (replorigin_by_oid(localorigin, true, &origin_name))
384 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s"),
385 : origin_name, localxmin, timestamptz_to_str(localts));
386 :
387 : /* The origin that modified this row has been removed. */
388 : else
389 0 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s"),
390 : localxmin, timestamptz_to_str(localts));
391 :
392 10 : append_tuple_value_detail(&err_detail,
393 10 : list_make3(local_desc, remote_desc,
394 : search_desc), false);
395 :
396 10 : break;
397 :
398 18 : case CT_DELETE_MISSING:
399 18 : appendStringInfoString(&err_detail, _("Could not find the row to be deleted"));
400 :
401 18 : append_tuple_value_detail(&err_detail,
402 18 : list_make1(search_desc), false);
403 :
404 18 : break;
405 : }
406 :
407 : Assert(err_detail.len > 0);
408 :
409 : /*
410 : * Insert a blank line to visually separate the new detail line from the
411 : * existing ones.
412 : */
413 204 : if (err_msg->len > 0)
414 72 : appendStringInfoChar(err_msg, '\n');
415 :
416 204 : appendStringInfoString(err_msg, err_detail.data);
417 204 : }
418 :
419 : /*
420 : * Extract conflicting key, local row, remote row, and replica identity
421 : * columns. Results are set at xxx_desc.
422 : *
423 : * If the output is NULL, it indicates that the current user lacks permissions
424 : * to view the columns involved.
425 : */
426 : static void
427 204 : get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type,
428 : char **key_desc,
429 : TupleTableSlot *localslot, char **local_desc,
430 : TupleTableSlot *remoteslot, char **remote_desc,
431 : TupleTableSlot *searchslot, char **search_desc,
432 : Oid indexoid)
433 : {
434 204 : Relation localrel = relinfo->ri_RelationDesc;
435 204 : Oid relid = RelationGetRelid(localrel);
436 204 : TupleDesc tupdesc = RelationGetDescr(localrel);
437 204 : char *desc = NULL;
438 :
439 : Assert((localslot && local_desc) || (remoteslot && remote_desc) ||
440 : (searchslot && search_desc));
441 :
442 : /*
443 : * Report the conflicting key values in the case of a unique constraint
444 : * violation.
445 : */
446 204 : if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
447 : type == CT_MULTIPLE_UNIQUE_CONFLICTS)
448 : {
449 : Assert(OidIsValid(indexoid) && localslot);
450 :
451 152 : desc = build_index_value_desc(estate, localrel, localslot,
452 : indexoid);
453 :
454 152 : if (desc)
455 152 : *key_desc = psprintf(_("key %s"), desc);
456 : }
457 :
458 204 : if (localslot)
459 : {
460 : /*
461 : * The 'modifiedCols' only applies to the new tuple, hence we pass
462 : * NULL for the local row.
463 : */
464 168 : desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
465 : NULL, 64);
466 :
467 168 : if (desc)
468 168 : *local_desc = psprintf(_("local row %s"), desc);
469 : }
470 :
471 204 : if (remoteslot)
472 : {
473 : Bitmapset *modifiedCols;
474 :
475 : /*
476 : * Although logical replication doesn't maintain the bitmap for the
477 : * columns being inserted, we still use it to create 'modifiedCols'
478 : * for consistency with other calls to ExecBuildSlotValueDescription.
479 : *
480 : * Note that generated columns are formed locally on the subscriber.
481 : */
482 176 : modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
483 176 : ExecGetUpdatedCols(relinfo, estate));
484 176 : desc = ExecBuildSlotValueDescription(relid, remoteslot,
485 : tupdesc, modifiedCols,
486 : 64);
487 :
488 176 : if (desc)
489 176 : *remote_desc = psprintf(_("remote row %s"), desc);
490 : }
491 :
492 204 : if (searchslot)
493 : {
494 : /*
495 : * Note that while index other than replica identity may be used (see
496 : * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
497 : * when applying update or delete, such an index scan may not result
498 : * in a unique tuple and we still compare the complete tuple in such
499 : * cases, thus such indexes are not used here.
500 : */
501 60 : Oid replica_index = GetRelationIdentityOrPK(localrel);
502 :
503 : Assert(type != CT_INSERT_EXISTS);
504 :
505 : /*
506 : * If the table has a valid replica identity index, build the index
507 : * key value string. Otherwise, construct the full tuple value for
508 : * REPLICA IDENTITY FULL cases.
509 : */
510 60 : if (OidIsValid(replica_index))
511 52 : desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
512 : else
513 8 : desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
514 :
515 60 : if (desc)
516 : {
517 60 : if (OidIsValid(replica_index))
518 52 : *search_desc = psprintf(_("replica identity %s"), desc);
519 : else
520 8 : *search_desc = psprintf(_("replica identity full %s"), desc);
521 : }
522 : }
523 204 : }
524 :
525 : /*
526 : * Helper functions to construct a string describing the contents of an index
527 : * entry. See BuildIndexValueDescription for details.
528 : *
529 : * The caller must ensure that the index with the OID 'indexoid' is locked so
530 : * that we can fetch and display the conflicting key value.
531 : */
532 : static char *
533 204 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
534 : Oid indexoid)
535 : {
536 : char *index_value;
537 : Relation indexDesc;
538 : Datum values[INDEX_MAX_KEYS];
539 : bool isnull[INDEX_MAX_KEYS];
540 204 : TupleTableSlot *tableslot = slot;
541 :
542 204 : if (!tableslot)
543 0 : return NULL;
544 :
545 : Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
546 :
547 204 : indexDesc = index_open(indexoid, NoLock);
548 :
549 : /*
550 : * If the slot is a virtual slot, copy it into a heap tuple slot as
551 : * FormIndexDatum only works with heap tuple slots.
552 : */
553 204 : if (TTS_IS_VIRTUAL(slot))
554 : {
555 34 : tableslot = table_slot_create(localrel, &estate->es_tupleTable);
556 34 : tableslot = ExecCopySlot(tableslot, slot);
557 : }
558 :
559 : /*
560 : * Initialize ecxt_scantuple for potential use in FormIndexDatum when
561 : * index expressions are present.
562 : */
563 204 : GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
564 :
565 : /*
566 : * The values/nulls arrays passed to BuildIndexValueDescription should be
567 : * the results of FormIndexDatum, which are the "raw" input to the index
568 : * AM.
569 : */
570 204 : FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
571 :
572 204 : index_value = BuildIndexValueDescription(indexDesc, values, isnull);
573 :
574 204 : index_close(indexDesc, NoLock);
575 :
576 204 : return index_value;
577 : }
|