Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * conflict.c
3 : * Support routines for logging conflicts.
4 : *
5 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/conflict.c
9 : *
10 : * This file contains the code for logging conflicts on the subscriber during
11 : * logical replication.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/tableam.h"
19 : #include "executor/executor.h"
20 : #include "pgstat.h"
21 : #include "replication/conflict.h"
22 : #include "replication/worker_internal.h"
23 : #include "storage/lmgr.h"
24 : #include "utils/lsyscache.h"
25 :
26 : static const char *const ConflictTypeNames[] = {
27 : [CT_INSERT_EXISTS] = "insert_exists",
28 : [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 : [CT_UPDATE_EXISTS] = "update_exists",
30 : [CT_UPDATE_MISSING] = "update_missing",
31 : [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 : [CT_UPDATE_DELETED] = "update_deleted",
33 : [CT_DELETE_MISSING] = "delete_missing",
34 : [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
35 : };
36 :
37 : static int errcode_apply_conflict(ConflictType type);
38 : static void errdetail_apply_conflict(EState *estate,
39 : ResultRelInfo *relinfo,
40 : ConflictType type,
41 : TupleTableSlot *searchslot,
42 : TupleTableSlot *localslot,
43 : TupleTableSlot *remoteslot,
44 : Oid indexoid, TransactionId localxmin,
45 : RepOriginId localorigin,
46 : TimestampTz localts, StringInfo err_msg);
47 : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
48 : ConflictType type,
49 : TupleTableSlot *searchslot,
50 : TupleTableSlot *localslot,
51 : TupleTableSlot *remoteslot,
52 : Oid indexoid);
53 : static char *build_index_value_desc(EState *estate, Relation localrel,
54 : TupleTableSlot *slot, Oid indexoid);
55 :
56 : /*
57 : * Get the xmin and commit timestamp data (origin and timestamp) associated
58 : * with the provided local row.
59 : *
60 : * Return true if the commit timestamp data was found, false otherwise.
61 : */
62 : bool
63 144592 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
64 : RepOriginId *localorigin, TimestampTz *localts)
65 : {
66 : Datum xminDatum;
67 : bool isnull;
68 :
69 144592 : xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
70 : &isnull);
71 144592 : *xmin = DatumGetTransactionId(xminDatum);
72 : Assert(!isnull);
73 :
74 : /*
75 : * The commit timestamp data is not available if track_commit_timestamp is
76 : * disabled.
77 : */
78 144592 : if (!track_commit_timestamp)
79 : {
80 144484 : *localorigin = InvalidRepOriginId;
81 144484 : *localts = 0;
82 144484 : return false;
83 : }
84 :
85 108 : return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
86 : }
87 :
88 : /*
89 : * This function is used to report a conflict while applying replication
90 : * changes.
91 : *
92 : * 'searchslot' should contain the tuple used to search the local row to be
93 : * updated or deleted.
94 : *
95 : * 'remoteslot' should contain the remote new tuple, if any.
96 : *
97 : * conflicttuples is a list of local rows that caused the conflict and the
98 : * conflict related information. See ConflictTupleInfo.
99 : *
100 : * The caller must ensure that all the indexes passed in ConflictTupleInfo are
101 : * locked so that we can fetch and display the conflicting key values.
102 : */
103 : void
104 116 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
105 : ConflictType type, TupleTableSlot *searchslot,
106 : TupleTableSlot *remoteslot, List *conflicttuples)
107 : {
108 116 : Relation localrel = relinfo->ri_RelationDesc;
109 : StringInfoData err_detail;
110 :
111 116 : initStringInfo(&err_detail);
112 :
113 : /* Form errdetail message by combining conflicting tuples information. */
114 404 : foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
115 172 : errdetail_apply_conflict(estate, relinfo, type, searchslot,
116 : conflicttuple->slot, remoteslot,
117 : conflicttuple->indexoid,
118 : conflicttuple->xmin,
119 172 : conflicttuple->origin,
120 : conflicttuple->ts,
121 : &err_detail);
122 :
123 116 : pgstat_report_subscription_conflict(MySubscription->oid, type);
124 :
125 116 : ereport(elevel,
126 : errcode_apply_conflict(type),
127 : errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
128 : get_namespace_name(RelationGetNamespace(localrel)),
129 : RelationGetRelationName(localrel),
130 : ConflictTypeNames[type]),
131 : errdetail_internal("%s", err_detail.data));
132 52 : }
133 :
134 : /*
135 : * Find all unique indexes to check for a conflict and store them into
136 : * ResultRelInfo.
137 : */
138 : void
139 215952 : InitConflictIndexes(ResultRelInfo *relInfo)
140 : {
141 215952 : List *uniqueIndexes = NIL;
142 :
143 391246 : for (int i = 0; i < relInfo->ri_NumIndices; i++)
144 : {
145 175294 : Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
146 :
147 175294 : if (indexRelation == NULL)
148 0 : continue;
149 :
150 : /* Detect conflict only for unique indexes */
151 175294 : if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
152 58 : continue;
153 :
154 : /* Don't support conflict detection for deferrable index */
155 175236 : if (!indexRelation->rd_index->indimmediate)
156 0 : continue;
157 :
158 175236 : uniqueIndexes = lappend_oid(uniqueIndexes,
159 : RelationGetRelid(indexRelation));
160 : }
161 :
162 215952 : relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
163 215952 : }
164 :
165 : /*
166 : * Add SQLSTATE error code to the current conflict report.
167 : */
168 : static int
169 116 : errcode_apply_conflict(ConflictType type)
170 : {
171 116 : switch (type)
172 : {
173 64 : case CT_INSERT_EXISTS:
174 : case CT_UPDATE_EXISTS:
175 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
176 64 : return errcode(ERRCODE_UNIQUE_VIOLATION);
177 52 : case CT_UPDATE_ORIGIN_DIFFERS:
178 : case CT_UPDATE_MISSING:
179 : case CT_DELETE_ORIGIN_DIFFERS:
180 : case CT_UPDATE_DELETED:
181 : case CT_DELETE_MISSING:
182 52 : return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
183 : }
184 :
185 : Assert(false);
186 0 : return 0; /* silence compiler warning */
187 : }
188 :
189 : /*
190 : * Add an errdetail() line showing conflict detail.
191 : *
192 : * The DETAIL line comprises of two parts:
193 : * 1. Explanation of the conflict type, including the origin and commit
194 : * timestamp of the existing local row.
195 : * 2. Display of conflicting key, existing local row, remote new row, and
196 : * replica identity columns, if any. The remote old row is excluded as its
197 : * information is covered in the replica identity columns.
198 : */
199 : static void
200 172 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
201 : ConflictType type, TupleTableSlot *searchslot,
202 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
203 : Oid indexoid, TransactionId localxmin,
204 : RepOriginId localorigin, TimestampTz localts,
205 : StringInfo err_msg)
206 : {
207 : StringInfoData err_detail;
208 : char *val_desc;
209 : char *origin_name;
210 :
211 172 : initStringInfo(&err_detail);
212 :
213 : /* First, construct a detailed message describing the type of conflict */
214 172 : switch (type)
215 : {
216 120 : case CT_INSERT_EXISTS:
217 : case CT_UPDATE_EXISTS:
218 : case CT_MULTIPLE_UNIQUE_CONFLICTS:
219 : Assert(OidIsValid(indexoid) &&
220 : CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
221 :
222 120 : if (localts)
223 : {
224 6 : if (localorigin == InvalidRepOriginId)
225 0 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
226 : get_rel_name(indexoid),
227 : localxmin, timestamptz_to_str(localts));
228 6 : else if (replorigin_by_oid(localorigin, true, &origin_name))
229 2 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
230 : get_rel_name(indexoid), origin_name,
231 : localxmin, timestamptz_to_str(localts));
232 :
233 : /*
234 : * The origin that modified this row has been removed. This
235 : * can happen if the origin was created by a different apply
236 : * worker and its associated subscription and origin were
237 : * dropped after updating the row, or if the origin was
238 : * manually dropped by the user.
239 : */
240 : else
241 4 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
242 : get_rel_name(indexoid),
243 : localxmin, timestamptz_to_str(localts));
244 : }
245 : else
246 114 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
247 : get_rel_name(indexoid), localxmin);
248 :
249 120 : break;
250 :
251 6 : case CT_UPDATE_ORIGIN_DIFFERS:
252 6 : if (localorigin == InvalidRepOriginId)
253 2 : appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
254 : localxmin, timestamptz_to_str(localts));
255 4 : else if (replorigin_by_oid(localorigin, true, &origin_name))
256 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
257 : origin_name, localxmin, timestamptz_to_str(localts));
258 :
259 : /* The origin that modified this row has been removed. */
260 : else
261 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
262 : localxmin, timestamptz_to_str(localts));
263 :
264 6 : break;
265 :
266 6 : case CT_UPDATE_DELETED:
267 6 : if (localts)
268 : {
269 6 : if (localorigin == InvalidRepOriginId)
270 6 : appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
271 : localxmin, timestamptz_to_str(localts));
272 0 : else if (replorigin_by_oid(localorigin, true, &origin_name))
273 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
274 : origin_name, localxmin, timestamptz_to_str(localts));
275 :
276 : /* The origin that modified this row has been removed. */
277 : else
278 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
279 : localxmin, timestamptz_to_str(localts));
280 : }
281 : else
282 0 : appendStringInfo(&err_detail, _("The row to be updated was deleted."));
283 :
284 6 : break;
285 :
286 12 : case CT_UPDATE_MISSING:
287 12 : appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
288 12 : break;
289 :
290 10 : case CT_DELETE_ORIGIN_DIFFERS:
291 10 : if (localorigin == InvalidRepOriginId)
292 8 : appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
293 : localxmin, timestamptz_to_str(localts));
294 2 : else if (replorigin_by_oid(localorigin, true, &origin_name))
295 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
296 : origin_name, localxmin, timestamptz_to_str(localts));
297 :
298 : /* The origin that modified this row has been removed. */
299 : else
300 0 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
301 : localxmin, timestamptz_to_str(localts));
302 :
303 10 : break;
304 :
305 18 : case CT_DELETE_MISSING:
306 18 : appendStringInfoString(&err_detail, _("Could not find the row to be deleted."));
307 18 : break;
308 : }
309 :
310 : Assert(err_detail.len > 0);
311 :
312 172 : val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
313 : localslot, remoteslot, indexoid);
314 :
315 : /*
316 : * Next, append the key values, existing local row, remote row, and
317 : * replica identity columns after the message.
318 : */
319 172 : if (val_desc)
320 172 : appendStringInfo(&err_detail, "\n%s", val_desc);
321 :
322 : /*
323 : * Insert a blank line to visually separate the new detail line from the
324 : * existing ones.
325 : */
326 172 : if (err_msg->len > 0)
327 56 : appendStringInfoChar(err_msg, '\n');
328 :
329 172 : appendStringInfoString(err_msg, err_detail.data);
330 172 : }
331 :
332 : /*
333 : * Helper function to build the additional details for conflicting key,
334 : * existing local row, remote row, and replica identity columns.
335 : *
336 : * If the return value is NULL, it indicates that the current user lacks
337 : * permissions to view the columns involved.
338 : */
339 : static char *
340 172 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
341 : ConflictType type,
342 : TupleTableSlot *searchslot,
343 : TupleTableSlot *localslot,
344 : TupleTableSlot *remoteslot,
345 : Oid indexoid)
346 : {
347 172 : Relation localrel = relinfo->ri_RelationDesc;
348 172 : Oid relid = RelationGetRelid(localrel);
349 172 : TupleDesc tupdesc = RelationGetDescr(localrel);
350 : StringInfoData tuple_value;
351 172 : char *desc = NULL;
352 :
353 : Assert(searchslot || localslot || remoteslot);
354 :
355 172 : initStringInfo(&tuple_value);
356 :
357 : /*
358 : * Report the conflicting key values in the case of a unique constraint
359 : * violation.
360 : */
361 172 : if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
362 : type == CT_MULTIPLE_UNIQUE_CONFLICTS)
363 : {
364 : Assert(OidIsValid(indexoid) && localslot);
365 :
366 120 : desc = build_index_value_desc(estate, localrel, localslot, indexoid);
367 :
368 120 : if (desc)
369 120 : appendStringInfo(&tuple_value, _("Key %s"), desc);
370 : }
371 :
372 172 : if (localslot)
373 : {
374 : /*
375 : * The 'modifiedCols' only applies to the new tuple, hence we pass
376 : * NULL for the existing local row.
377 : */
378 136 : desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
379 : NULL, 64);
380 :
381 136 : if (desc)
382 : {
383 136 : if (tuple_value.len > 0)
384 : {
385 120 : appendStringInfoString(&tuple_value, "; ");
386 120 : appendStringInfo(&tuple_value, _("existing local row %s"),
387 : desc);
388 : }
389 : else
390 : {
391 16 : appendStringInfo(&tuple_value, _("Existing local row %s"),
392 : desc);
393 : }
394 : }
395 : }
396 :
397 172 : if (remoteslot)
398 : {
399 : Bitmapset *modifiedCols;
400 :
401 : /*
402 : * Although logical replication doesn't maintain the bitmap for the
403 : * columns being inserted, we still use it to create 'modifiedCols'
404 : * for consistency with other calls to ExecBuildSlotValueDescription.
405 : *
406 : * Note that generated columns are formed locally on the subscriber.
407 : */
408 144 : modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
409 144 : ExecGetUpdatedCols(relinfo, estate));
410 144 : desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
411 : modifiedCols, 64);
412 :
413 144 : if (desc)
414 : {
415 144 : if (tuple_value.len > 0)
416 : {
417 126 : appendStringInfoString(&tuple_value, "; ");
418 126 : appendStringInfo(&tuple_value, _("remote row %s"), desc);
419 : }
420 : else
421 : {
422 18 : appendStringInfo(&tuple_value, _("Remote row %s"), desc);
423 : }
424 : }
425 : }
426 :
427 172 : if (searchslot)
428 : {
429 : /*
430 : * Note that while index other than replica identity may be used (see
431 : * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
432 : * when applying update or delete, such an index scan may not result
433 : * in a unique tuple and we still compare the complete tuple in such
434 : * cases, thus such indexes are not used here.
435 : */
436 60 : Oid replica_index = GetRelationIdentityOrPK(localrel);
437 :
438 : Assert(type != CT_INSERT_EXISTS);
439 :
440 : /*
441 : * If the table has a valid replica identity index, build the index
442 : * key value string. Otherwise, construct the full tuple value for
443 : * REPLICA IDENTITY FULL cases.
444 : */
445 60 : if (OidIsValid(replica_index))
446 52 : desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
447 : else
448 8 : desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
449 :
450 60 : if (desc)
451 : {
452 60 : if (tuple_value.len > 0)
453 : {
454 42 : appendStringInfoString(&tuple_value, "; ");
455 84 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
456 34 : ? _("replica identity %s")
457 8 : : _("replica identity full %s"), desc);
458 : }
459 : else
460 : {
461 36 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
462 18 : ? _("Replica identity %s")
463 0 : : _("Replica identity full %s"), desc);
464 : }
465 : }
466 : }
467 :
468 172 : if (tuple_value.len == 0)
469 0 : return NULL;
470 :
471 172 : appendStringInfoChar(&tuple_value, '.');
472 172 : return tuple_value.data;
473 : }
474 :
475 : /*
476 : * Helper functions to construct a string describing the contents of an index
477 : * entry. See BuildIndexValueDescription for details.
478 : *
479 : * The caller must ensure that the index with the OID 'indexoid' is locked so
480 : * that we can fetch and display the conflicting key value.
481 : */
482 : static char *
483 172 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
484 : Oid indexoid)
485 : {
486 : char *index_value;
487 : Relation indexDesc;
488 : Datum values[INDEX_MAX_KEYS];
489 : bool isnull[INDEX_MAX_KEYS];
490 172 : TupleTableSlot *tableslot = slot;
491 :
492 172 : if (!tableslot)
493 0 : return NULL;
494 :
495 : Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
496 :
497 172 : indexDesc = index_open(indexoid, NoLock);
498 :
499 : /*
500 : * If the slot is a virtual slot, copy it into a heap tuple slot as
501 : * FormIndexDatum only works with heap tuple slots.
502 : */
503 172 : if (TTS_IS_VIRTUAL(slot))
504 : {
505 34 : tableslot = table_slot_create(localrel, &estate->es_tupleTable);
506 34 : tableslot = ExecCopySlot(tableslot, slot);
507 : }
508 :
509 : /*
510 : * Initialize ecxt_scantuple for potential use in FormIndexDatum when
511 : * index expressions are present.
512 : */
513 172 : GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
514 :
515 : /*
516 : * The values/nulls arrays passed to BuildIndexValueDescription should be
517 : * the results of FormIndexDatum, which are the "raw" input to the index
518 : * AM.
519 : */
520 172 : FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
521 :
522 172 : index_value = BuildIndexValueDescription(indexDesc, values, isnull);
523 :
524 172 : index_close(indexDesc, NoLock);
525 :
526 172 : return index_value;
527 : }
|