Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * conflict.c
3 : * Support routines for logging conflicts.
4 : *
5 : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/conflict.c
9 : *
10 : * This file contains the code for logging conflicts on the subscriber during
11 : * logical replication.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/genam.h"
19 : #include "access/tableam.h"
20 : #include "executor/executor.h"
21 : #include "pgstat.h"
22 : #include "replication/conflict.h"
23 : #include "replication/worker_internal.h"
24 : #include "storage/lmgr.h"
25 : #include "utils/lsyscache.h"
26 :
27 : static const char *const ConflictTypeNames[] = {
28 : [CT_INSERT_EXISTS] = "insert_exists",
29 : [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
30 : [CT_UPDATE_EXISTS] = "update_exists",
31 : [CT_UPDATE_MISSING] = "update_missing",
32 : [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
33 : [CT_UPDATE_DELETED] = "update_deleted",
34 : [CT_DELETE_MISSING] = "delete_missing",
35 : [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
36 : };
37 :
38 : static int errcode_apply_conflict(ConflictType type);
39 : static void errdetail_apply_conflict(EState *estate,
40 : ResultRelInfo *relinfo,
41 : ConflictType type,
42 : TupleTableSlot *searchslot,
43 : TupleTableSlot *localslot,
44 : TupleTableSlot *remoteslot,
45 : Oid indexoid, TransactionId localxmin,
46 : ReplOriginId localorigin,
47 : TimestampTz localts, StringInfo err_msg);
48 : static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
49 : ConflictType type, char **key_desc,
50 : TupleTableSlot *searchslot, char **search_desc,
51 : TupleTableSlot *localslot, char **local_desc,
52 : TupleTableSlot *remoteslot, char **remote_desc,
53 : Oid indexoid);
54 : static char *build_index_value_desc(EState *estate, Relation localrel,
55 : TupleTableSlot *slot, Oid indexoid);
56 :
57 : /*
58 : * Get the xmin and commit timestamp data (origin and timestamp) associated
59 : * with the provided local row.
60 : *
61 : * Return true if the commit timestamp data was found, false otherwise.
62 : */
63 : bool
64 72332 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
65 : ReplOriginId *localorigin, TimestampTz *localts)
66 : {
67 : Datum xminDatum;
68 : bool isnull;
69 :
70 72332 : xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
71 : &isnull);
72 72332 : *xmin = DatumGetTransactionId(xminDatum);
73 : Assert(!isnull);
74 :
75 : /*
76 : * The commit timestamp data is not available if track_commit_timestamp is
77 : * disabled.
78 : */
79 72332 : if (!track_commit_timestamp)
80 : {
81 72256 : *localorigin = InvalidReplOriginId;
82 72256 : *localts = 0;
83 72256 : return false;
84 : }
85 :
86 76 : return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
87 : }
88 :
89 : /*
90 : * This function is used to report a conflict while applying replication
91 : * changes.
92 : *
93 : * 'searchslot' should contain the tuple used to search the local row to be
94 : * updated or deleted.
95 : *
96 : * 'remoteslot' should contain the remote new tuple, if any.
97 : *
98 : * conflicttuples is a list of local rows that caused the conflict and the
99 : * conflict related information. See ConflictTupleInfo.
100 : *
101 : * The caller must ensure that all the indexes passed in ConflictTupleInfo are
102 : * locked so that we can fetch and display the conflicting key values.
103 : */
104 : void
105 74 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
106 : ConflictType type, TupleTableSlot *searchslot,
107 : TupleTableSlot *remoteslot, List *conflicttuples)
108 : {
109 74 : Relation localrel = relinfo->ri_RelationDesc;
110 : StringInfoData err_detail;
111 :
112 74 : initStringInfo(&err_detail);
113 :
114 : /* Form errdetail message by combining conflicting tuples information. */
115 262 : foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
116 114 : errdetail_apply_conflict(estate, relinfo, type, searchslot,
117 : conflicttuple->slot, remoteslot,
118 : conflicttuple->indexoid,
119 : conflicttuple->xmin,
120 114 : conflicttuple->origin,
121 : conflicttuple->ts,
122 : &err_detail);
123 :
124 74 : pgstat_report_subscription_conflict(MySubscription->oid, type);
125 :
126 74 : ereport(elevel,
127 : errcode_apply_conflict(type),
128 : errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
129 : get_namespace_name(RelationGetNamespace(localrel)),
130 : RelationGetRelationName(localrel),
131 : ConflictTypeNames[type]),
132 : errdetail_internal("%s", err_detail.data));
133 30 : }
134 :
135 : /*
136 : * Find all unique indexes to check for a conflict and store them into
137 : * ResultRelInfo.
138 : */
139 : void
140 131148 : InitConflictIndexes(ResultRelInfo *relInfo)
141 : {
142 131148 : List *uniqueIndexes = NIL;
143 :
144 238872 : for (int i = 0; i < relInfo->ri_NumIndices; i++)
145 : {
146 107724 : Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
147 :
148 107724 : if (indexRelation == NULL)
149 0 : continue;
150 :
151 : /* Detect conflict only for unique indexes */
152 107724 : if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
153 29 : continue;
154 :
155 : /* Don't support conflict detection for deferrable index */
156 107695 : if (!indexRelation->rd_index->indimmediate)
157 0 : continue;
158 :
159 107695 : uniqueIndexes = lappend_oid(uniqueIndexes,
160 : RelationGetRelid(indexRelation));
161 : }
162 :
163 131148 : relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
164 131148 : }
165 :
166 : /*
167 : * Add SQLSTATE error code to the current conflict report.
168 : */
169 : static int
170 74 : errcode_apply_conflict(ConflictType type)
171 : {
172 74 : switch (type)
173 : {
174 44 : case CT_INSERT_EXISTS:
175 : case CT_UPDATE_EXISTS:
176 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
177 44 : return errcode(ERRCODE_UNIQUE_VIOLATION);
178 30 : case CT_UPDATE_ORIGIN_DIFFERS:
179 : case CT_UPDATE_MISSING:
180 : case CT_DELETE_ORIGIN_DIFFERS:
181 : case CT_UPDATE_DELETED:
182 : case CT_DELETE_MISSING:
183 30 : return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
184 : }
185 :
186 : Assert(false);
187 0 : return 0; /* silence compiler warning */
188 : }
189 :
190 : /*
191 : * Helper function to build the additional details for conflicting key,
192 : * local row, remote row, and replica identity columns.
193 : */
194 : static void
195 158 : append_tuple_value_detail(StringInfo buf, List *tuple_values,
196 : bool need_newline)
197 : {
198 158 : bool first = true;
199 :
200 : Assert(buf != NULL && tuple_values != NIL);
201 :
202 631 : foreach_ptr(char, tuple_value, tuple_values)
203 : {
204 : /*
205 : * Skip if the value is NULL. This means the current user does not
206 : * have enough permissions to see all columns in the table. See
207 : * get_tuple_desc().
208 : */
209 315 : if (!tuple_value)
210 47 : continue;
211 :
212 268 : if (first)
213 : {
214 : /*
215 : * translator: The colon is used as a separator in conflict
216 : * messages. The first part, built in the caller, describes what
217 : * happened locally; the second part lists the conflicting keys
218 : * and tuple data.
219 : */
220 158 : appendStringInfoString(buf, _(": "));
221 : }
222 : else
223 : {
224 : /*
225 : * translator: This is a separator in a list of conflicting keys
226 : * and tuple data.
227 : */
228 110 : appendStringInfoString(buf, _(", "));
229 : }
230 :
231 268 : appendStringInfoString(buf, tuple_value);
232 268 : first = false;
233 : }
234 :
235 : /* translator: This is the terminator of a conflict message */
236 158 : appendStringInfoString(buf, _("."));
237 :
238 158 : if (need_newline)
239 47 : appendStringInfoChar(buf, '\n');
240 158 : }
241 :
242 : /*
243 : * Add an errdetail() line showing conflict detail.
244 : *
245 : * The DETAIL line comprises of two parts:
246 : * 1. Explanation of the conflict type, including the origin and commit
247 : * timestamp of the local row.
248 : * 2. Display of conflicting key, local row, remote new row, and replica
249 : * identity columns, if any. The remote old row is excluded as its
250 : * information is covered in the replica identity columns.
251 : */
252 : static void
253 114 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
254 : ConflictType type, TupleTableSlot *searchslot,
255 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
256 : Oid indexoid, TransactionId localxmin,
257 : ReplOriginId localorigin, TimestampTz localts,
258 : StringInfo err_msg)
259 : {
260 : StringInfoData err_detail;
261 : char *origin_name;
262 114 : char *key_desc = NULL;
263 114 : char *local_desc = NULL;
264 114 : char *remote_desc = NULL;
265 114 : char *search_desc = NULL;
266 :
267 : /* Get key, replica identity, remote, and local value data */
268 114 : get_tuple_desc(estate, relinfo, type, &key_desc,
269 : localslot, &local_desc,
270 : remoteslot, &remote_desc,
271 : searchslot, &search_desc,
272 : indexoid);
273 :
274 114 : initStringInfo(&err_detail);
275 :
276 : /* Construct a detailed message describing the type of conflict */
277 114 : switch (type)
278 : {
279 84 : case CT_INSERT_EXISTS:
280 : case CT_UPDATE_EXISTS:
281 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
282 : Assert(OidIsValid(indexoid) &&
283 : CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
284 :
285 84 : if (err_msg->len == 0)
286 : {
287 44 : appendStringInfoString(&err_detail, _("Could not apply remote change"));
288 :
289 44 : append_tuple_value_detail(&err_detail,
290 : list_make2(remote_desc, search_desc),
291 : true);
292 : }
293 :
294 84 : if (localts)
295 : {
296 3 : if (localorigin == InvalidReplOriginId)
297 0 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
298 : get_rel_name(indexoid),
299 : localxmin, timestamptz_to_str(localts));
300 3 : else if (replorigin_by_oid(localorigin, true, &origin_name))
301 1 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s"),
302 : get_rel_name(indexoid), origin_name,
303 : localxmin, timestamptz_to_str(localts));
304 :
305 : /*
306 : * The origin that modified this row has been removed. This
307 : * can happen if the origin was created by a different apply
308 : * worker and its associated subscription and origin were
309 : * dropped after updating the row, or if the origin was
310 : * manually dropped by the user.
311 : */
312 : else
313 2 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s"),
314 : get_rel_name(indexoid),
315 : localxmin, timestamptz_to_str(localts));
316 : }
317 : else
318 81 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u"),
319 : get_rel_name(indexoid), localxmin);
320 :
321 84 : append_tuple_value_detail(&err_detail,
322 : list_make2(key_desc, local_desc), false);
323 :
324 84 : break;
325 :
326 3 : case CT_UPDATE_ORIGIN_DIFFERS:
327 3 : if (localorigin == InvalidReplOriginId)
328 1 : appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
329 : localxmin, timestamptz_to_str(localts));
330 2 : else if (replorigin_by_oid(localorigin, true, &origin_name))
331 1 : appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s"),
332 : origin_name, localxmin, timestamptz_to_str(localts));
333 :
334 : /* The origin that modified this row has been removed. */
335 : else
336 1 : appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s"),
337 : localxmin, timestamptz_to_str(localts));
338 :
339 3 : append_tuple_value_detail(&err_detail,
340 : list_make3(local_desc, remote_desc,
341 : search_desc), false);
342 :
343 3 : break;
344 :
345 3 : case CT_UPDATE_DELETED:
346 3 : appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
347 :
348 3 : append_tuple_value_detail(&err_detail,
349 : list_make2(remote_desc, search_desc),
350 : true);
351 :
352 3 : if (localts)
353 : {
354 3 : if (localorigin == InvalidReplOriginId)
355 3 : appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
356 : localxmin, timestamptz_to_str(localts));
357 0 : else if (replorigin_by_oid(localorigin, true, &origin_name))
358 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s"),
359 : origin_name, localxmin, timestamptz_to_str(localts));
360 :
361 : /* The origin that modified this row has been removed. */
362 : else
363 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s"),
364 : localxmin, timestamptz_to_str(localts));
365 : }
366 : else
367 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted"));
368 :
369 3 : break;
370 :
371 10 : case CT_UPDATE_MISSING:
372 10 : appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
373 :
374 10 : append_tuple_value_detail(&err_detail,
375 : list_make2(remote_desc, search_desc),
376 : false);
377 :
378 10 : break;
379 :
380 5 : case CT_DELETE_ORIGIN_DIFFERS:
381 5 : if (localorigin == InvalidReplOriginId)
382 4 : appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
383 : localxmin, timestamptz_to_str(localts));
384 1 : else if (replorigin_by_oid(localorigin, true, &origin_name))
385 1 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s"),
386 : origin_name, localxmin, timestamptz_to_str(localts));
387 :
388 : /* The origin that modified this row has been removed. */
389 : else
390 0 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s"),
391 : localxmin, timestamptz_to_str(localts));
392 :
393 5 : append_tuple_value_detail(&err_detail,
394 : list_make3(local_desc, remote_desc,
395 : search_desc), false);
396 :
397 5 : break;
398 :
399 9 : case CT_DELETE_MISSING:
400 9 : appendStringInfoString(&err_detail, _("Could not find the row to be deleted"));
401 :
402 9 : append_tuple_value_detail(&err_detail,
403 : list_make1(search_desc), false);
404 :
405 9 : break;
406 : }
407 :
408 : Assert(err_detail.len > 0);
409 :
410 : /*
411 : * Insert a blank line to visually separate the new detail line from the
412 : * existing ones.
413 : */
414 114 : if (err_msg->len > 0)
415 40 : appendStringInfoChar(err_msg, '\n');
416 :
417 114 : appendStringInfoString(err_msg, err_detail.data);
418 114 : }
419 :
420 : /*
421 : * Extract conflicting key, local row, remote row, and replica identity
422 : * columns. Results are set at xxx_desc.
423 : *
424 : * If the output is NULL, it indicates that the current user lacks permissions
425 : * to view the columns involved.
426 : */
427 : static void
428 114 : get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type,
429 : char **key_desc,
430 : TupleTableSlot *localslot, char **local_desc,
431 : TupleTableSlot *remoteslot, char **remote_desc,
432 : TupleTableSlot *searchslot, char **search_desc,
433 : Oid indexoid)
434 : {
435 114 : Relation localrel = relinfo->ri_RelationDesc;
436 114 : Oid relid = RelationGetRelid(localrel);
437 114 : TupleDesc tupdesc = RelationGetDescr(localrel);
438 114 : char *desc = NULL;
439 :
440 : Assert((localslot && local_desc) || (remoteslot && remote_desc) ||
441 : (searchslot && search_desc));
442 :
443 : /*
444 : * Report the conflicting key values in the case of a unique constraint
445 : * violation.
446 : */
447 114 : if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
448 : type == CT_MULTIPLE_UNIQUE_CONFLICTS)
449 : {
450 : Assert(OidIsValid(indexoid) && localslot);
451 :
452 84 : desc = build_index_value_desc(estate, localrel, localslot,
453 : indexoid);
454 :
455 84 : if (desc)
456 84 : *key_desc = psprintf(_("key %s"), desc);
457 : }
458 :
459 114 : if (localslot)
460 : {
461 : /*
462 : * The 'modifiedCols' only applies to the new tuple, hence we pass
463 : * NULL for the local row.
464 : */
465 92 : desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
466 : NULL, 64);
467 :
468 92 : if (desc)
469 92 : *local_desc = psprintf(_("local row %s"), desc);
470 : }
471 :
472 114 : if (remoteslot)
473 : {
474 : Bitmapset *modifiedCols;
475 :
476 : /*
477 : * Although logical replication doesn't maintain the bitmap for the
478 : * columns being inserted, we still use it to create 'modifiedCols'
479 : * for consistency with other calls to ExecBuildSlotValueDescription.
480 : *
481 : * Note that generated columns are formed locally on the subscriber.
482 : */
483 100 : modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
484 100 : ExecGetUpdatedCols(relinfo, estate));
485 100 : desc = ExecBuildSlotValueDescription(relid, remoteslot,
486 : tupdesc, modifiedCols,
487 : 64);
488 :
489 100 : if (desc)
490 100 : *remote_desc = psprintf(_("remote row %s"), desc);
491 : }
492 :
493 114 : if (searchslot)
494 : {
495 : /*
496 : * Note that while index other than replica identity may be used (see
497 : * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
498 : * when applying update or delete, such an index scan may not result
499 : * in a unique tuple and we still compare the complete tuple in such
500 : * cases, thus such indexes are not used here.
501 : */
502 34 : Oid replica_index = GetRelationIdentityOrPK(localrel);
503 :
504 : Assert(type != CT_INSERT_EXISTS);
505 :
506 : /*
507 : * If the table has a valid replica identity index, build the index
508 : * key value string. Otherwise, construct the full tuple value for
509 : * REPLICA IDENTITY FULL cases.
510 : */
511 34 : if (OidIsValid(replica_index))
512 30 : desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
513 : else
514 4 : desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
515 :
516 34 : if (desc)
517 : {
518 34 : if (OidIsValid(replica_index))
519 30 : *search_desc = psprintf(_("replica identity %s"), desc);
520 : else
521 4 : *search_desc = psprintf(_("replica identity full %s"), desc);
522 : }
523 : }
524 114 : }
525 :
526 : /*
527 : * Helper functions to construct a string describing the contents of an index
528 : * entry. See BuildIndexValueDescription for details.
529 : *
530 : * The caller must ensure that the index with the OID 'indexoid' is locked so
531 : * that we can fetch and display the conflicting key value.
532 : */
533 : static char *
534 114 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
535 : Oid indexoid)
536 : {
537 : char *index_value;
538 : Relation indexDesc;
539 : Datum values[INDEX_MAX_KEYS];
540 : bool isnull[INDEX_MAX_KEYS];
541 114 : TupleTableSlot *tableslot = slot;
542 :
543 114 : if (!tableslot)
544 0 : return NULL;
545 :
546 : Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
547 :
548 114 : indexDesc = index_open(indexoid, NoLock);
549 :
550 : /*
551 : * If the slot is a virtual slot, copy it into a heap tuple slot as
552 : * FormIndexDatum only works with heap tuple slots.
553 : */
554 114 : if (TTS_IS_VIRTUAL(slot))
555 : {
556 21 : tableslot = table_slot_create(localrel, &estate->es_tupleTable);
557 21 : tableslot = ExecCopySlot(tableslot, slot);
558 : }
559 :
560 : /*
561 : * Initialize ecxt_scantuple for potential use in FormIndexDatum when
562 : * index expressions are present.
563 : */
564 114 : GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
565 :
566 : /*
567 : * The values/nulls arrays passed to BuildIndexValueDescription should be
568 : * the results of FormIndexDatum, which are the "raw" input to the index
569 : * AM.
570 : */
571 114 : FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
572 :
573 114 : index_value = BuildIndexValueDescription(indexDesc, values, isnull);
574 :
575 114 : index_close(indexDesc, NoLock);
576 :
577 114 : return index_value;
578 : }
|