Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * conflict.c
3 : * Support routines for logging conflicts.
4 : *
5 : * Copyright (c) 2024, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/conflict.c
9 : *
10 : * This file contains the code for logging conflicts on the subscriber during
11 : * logical replication.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/tableam.h"
19 : #include "executor/executor.h"
20 : #include "pgstat.h"
21 : #include "replication/conflict.h"
22 : #include "replication/worker_internal.h"
23 : #include "storage/lmgr.h"
24 : #include "utils/lsyscache.h"
25 :
26 : static const char *const ConflictTypeNames[] = {
27 : [CT_INSERT_EXISTS] = "insert_exists",
28 : [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 : [CT_UPDATE_EXISTS] = "update_exists",
30 : [CT_UPDATE_MISSING] = "update_missing",
31 : [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 : [CT_DELETE_MISSING] = "delete_missing"
33 : };
34 :
35 : static int errcode_apply_conflict(ConflictType type);
36 : static int errdetail_apply_conflict(EState *estate,
37 : ResultRelInfo *relinfo,
38 : ConflictType type,
39 : TupleTableSlot *searchslot,
40 : TupleTableSlot *localslot,
41 : TupleTableSlot *remoteslot,
42 : Oid indexoid, TransactionId localxmin,
43 : RepOriginId localorigin,
44 : TimestampTz localts);
45 : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
46 : ConflictType type,
47 : TupleTableSlot *searchslot,
48 : TupleTableSlot *localslot,
49 : TupleTableSlot *remoteslot,
50 : Oid indexoid);
51 : static char *build_index_value_desc(EState *estate, Relation localrel,
52 : TupleTableSlot *slot, Oid indexoid);
53 :
54 : /*
55 : * Get the xmin and commit timestamp data (origin and timestamp) associated
56 : * with the provided local tuple.
57 : *
58 : * Return true if the commit timestamp data was found, false otherwise.
59 : */
60 : bool
61 144456 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
62 : RepOriginId *localorigin, TimestampTz *localts)
63 : {
64 : Datum xminDatum;
65 : bool isnull;
66 :
67 144456 : xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
68 : &isnull);
69 144456 : *xmin = DatumGetTransactionId(xminDatum);
70 : Assert(!isnull);
71 :
72 : /*
73 : * The commit timestamp data is not available if track_commit_timestamp is
74 : * disabled.
75 : */
76 144456 : if (!track_commit_timestamp)
77 : {
78 144438 : *localorigin = InvalidRepOriginId;
79 144438 : *localts = 0;
80 144438 : return false;
81 : }
82 :
83 18 : return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
84 : }
85 :
86 : /*
87 : * This function is used to report a conflict while applying replication
88 : * changes.
89 : *
90 : * 'searchslot' should contain the tuple used to search the local tuple to be
91 : * updated or deleted.
92 : *
93 : * 'localslot' should contain the existing local tuple, if any, that conflicts
94 : * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
95 : * transaction information related to this existing local tuple.
96 : *
97 : * 'remoteslot' should contain the remote new tuple, if any.
98 : *
99 : * The 'indexoid' represents the OID of the unique index that triggered the
100 : * constraint violation error. We use this to report the key values for
101 : * conflicting tuple.
102 : *
103 : * The caller must ensure that the index with the OID 'indexoid' is locked so
104 : * that we can fetch and display the conflicting key value.
105 : */
106 : void
107 52 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
108 : ConflictType type, TupleTableSlot *searchslot,
109 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
110 : Oid indexoid, TransactionId localxmin,
111 : RepOriginId localorigin, TimestampTz localts)
112 : {
113 52 : Relation localrel = relinfo->ri_RelationDesc;
114 :
115 : Assert(!OidIsValid(indexoid) ||
116 : CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
117 :
118 52 : pgstat_report_subscription_conflict(MySubscription->oid, type);
119 :
120 52 : ereport(elevel,
121 : errcode_apply_conflict(type),
122 : errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
123 : get_namespace_name(RelationGetNamespace(localrel)),
124 : RelationGetRelationName(localrel),
125 : ConflictTypeNames[type]),
126 : errdetail_apply_conflict(estate, relinfo, type, searchslot,
127 : localslot, remoteslot, indexoid,
128 : localxmin, localorigin, localts));
129 38 : }
130 :
131 : /*
132 : * Find all unique indexes to check for a conflict and store them into
133 : * ResultRelInfo.
134 : */
135 : void
136 216342 : InitConflictIndexes(ResultRelInfo *relInfo)
137 : {
138 216342 : List *uniqueIndexes = NIL;
139 :
140 391958 : for (int i = 0; i < relInfo->ri_NumIndices; i++)
141 : {
142 175616 : Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
143 :
144 175616 : if (indexRelation == NULL)
145 0 : continue;
146 :
147 : /* Detect conflict only for unique indexes */
148 175616 : if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
149 32 : continue;
150 :
151 : /* Don't support conflict detection for deferrable index */
152 175584 : if (!indexRelation->rd_index->indimmediate)
153 0 : continue;
154 :
155 175584 : uniqueIndexes = lappend_oid(uniqueIndexes,
156 : RelationGetRelid(indexRelation));
157 : }
158 :
159 216342 : relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
160 216342 : }
161 :
162 : /*
163 : * Add SQLSTATE error code to the current conflict report.
164 : */
165 : static int
166 52 : errcode_apply_conflict(ConflictType type)
167 : {
168 52 : switch (type)
169 : {
170 14 : case CT_INSERT_EXISTS:
171 : case CT_UPDATE_EXISTS:
172 14 : return errcode(ERRCODE_UNIQUE_VIOLATION);
173 38 : case CT_UPDATE_ORIGIN_DIFFERS:
174 : case CT_UPDATE_MISSING:
175 : case CT_DELETE_ORIGIN_DIFFERS:
176 : case CT_DELETE_MISSING:
177 38 : return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
178 : }
179 :
180 : Assert(false);
181 0 : return 0; /* silence compiler warning */
182 : }
183 :
184 : /*
185 : * Add an errdetail() line showing conflict detail.
186 : *
187 : * The DETAIL line comprises of two parts:
188 : * 1. Explanation of the conflict type, including the origin and commit
189 : * timestamp of the existing local tuple.
190 : * 2. Display of conflicting key, existing local tuple, remote new tuple, and
191 : * replica identity columns, if any. The remote old tuple is excluded as its
192 : * information is covered in the replica identity columns.
193 : */
194 : static int
195 52 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
196 : ConflictType type, TupleTableSlot *searchslot,
197 : TupleTableSlot *localslot, TupleTableSlot *remoteslot,
198 : Oid indexoid, TransactionId localxmin,
199 : RepOriginId localorigin, TimestampTz localts)
200 : {
201 : StringInfoData err_detail;
202 : char *val_desc;
203 : char *origin_name;
204 :
205 52 : initStringInfo(&err_detail);
206 :
207 : /* First, construct a detailed message describing the type of conflict */
208 52 : switch (type)
209 : {
210 14 : case CT_INSERT_EXISTS:
211 : case CT_UPDATE_EXISTS:
212 : Assert(OidIsValid(indexoid));
213 :
214 14 : if (localts)
215 : {
216 6 : if (localorigin == InvalidRepOriginId)
217 0 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
218 : get_rel_name(indexoid),
219 : localxmin, timestamptz_to_str(localts));
220 6 : else if (replorigin_by_oid(localorigin, true, &origin_name))
221 2 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
222 : get_rel_name(indexoid), origin_name,
223 : localxmin, timestamptz_to_str(localts));
224 :
225 : /*
226 : * The origin that modified this row has been removed. This
227 : * can happen if the origin was created by a different apply
228 : * worker and its associated subscription and origin were
229 : * dropped after updating the row, or if the origin was
230 : * manually dropped by the user.
231 : */
232 : else
233 4 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
234 : get_rel_name(indexoid),
235 : localxmin, timestamptz_to_str(localts));
236 : }
237 : else
238 8 : appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
239 : get_rel_name(indexoid), localxmin);
240 :
241 14 : break;
242 :
243 6 : case CT_UPDATE_ORIGIN_DIFFERS:
244 6 : if (localorigin == InvalidRepOriginId)
245 2 : appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
246 : localxmin, timestamptz_to_str(localts));
247 4 : else if (replorigin_by_oid(localorigin, true, &origin_name))
248 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
249 : origin_name, localxmin, timestamptz_to_str(localts));
250 :
251 : /* The origin that modified this row has been removed. */
252 : else
253 2 : appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
254 : localxmin, timestamptz_to_str(localts));
255 :
256 6 : break;
257 :
258 10 : case CT_UPDATE_MISSING:
259 10 : appendStringInfo(&err_detail, _("Could not find the row to be updated."));
260 10 : break;
261 :
262 4 : case CT_DELETE_ORIGIN_DIFFERS:
263 4 : if (localorigin == InvalidRepOriginId)
264 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
265 : localxmin, timestamptz_to_str(localts));
266 2 : else if (replorigin_by_oid(localorigin, true, &origin_name))
267 2 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
268 : origin_name, localxmin, timestamptz_to_str(localts));
269 :
270 : /* The origin that modified this row has been removed. */
271 : else
272 0 : appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
273 : localxmin, timestamptz_to_str(localts));
274 :
275 4 : break;
276 :
277 18 : case CT_DELETE_MISSING:
278 18 : appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
279 18 : break;
280 : }
281 :
282 52 : Assert(err_detail.len > 0);
283 :
284 52 : val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
285 : localslot, remoteslot, indexoid);
286 :
287 : /*
288 : * Next, append the key values, existing local tuple, remote tuple and
289 : * replica identity columns after the message.
290 : */
291 52 : if (val_desc)
292 52 : appendStringInfo(&err_detail, "\n%s", val_desc);
293 :
294 52 : return errdetail_internal("%s", err_detail.data);
295 : }
296 :
297 : /*
298 : * Helper function to build the additional details for conflicting key,
299 : * existing local tuple, remote tuple, and replica identity columns.
300 : *
301 : * If the return value is NULL, it indicates that the current user lacks
302 : * permissions to view the columns involved.
303 : */
304 : static char *
305 52 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
306 : ConflictType type,
307 : TupleTableSlot *searchslot,
308 : TupleTableSlot *localslot,
309 : TupleTableSlot *remoteslot,
310 : Oid indexoid)
311 : {
312 52 : Relation localrel = relinfo->ri_RelationDesc;
313 52 : Oid relid = RelationGetRelid(localrel);
314 52 : TupleDesc tupdesc = RelationGetDescr(localrel);
315 : StringInfoData tuple_value;
316 52 : char *desc = NULL;
317 :
318 : Assert(searchslot || localslot || remoteslot);
319 :
320 52 : initStringInfo(&tuple_value);
321 :
322 : /*
323 : * Report the conflicting key values in the case of a unique constraint
324 : * violation.
325 : */
326 52 : if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
327 : {
328 : Assert(OidIsValid(indexoid) && localslot);
329 :
330 14 : desc = build_index_value_desc(estate, localrel, localslot, indexoid);
331 :
332 14 : if (desc)
333 14 : appendStringInfo(&tuple_value, _("Key %s"), desc);
334 : }
335 :
336 52 : if (localslot)
337 : {
338 : /*
339 : * The 'modifiedCols' only applies to the new tuple, hence we pass
340 : * NULL for the existing local tuple.
341 : */
342 24 : desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
343 : NULL, 64);
344 :
345 24 : if (desc)
346 : {
347 24 : if (tuple_value.len > 0)
348 : {
349 14 : appendStringInfoString(&tuple_value, "; ");
350 14 : appendStringInfo(&tuple_value, _("existing local tuple %s"),
351 : desc);
352 : }
353 : else
354 : {
355 10 : appendStringInfo(&tuple_value, _("Existing local tuple %s"),
356 : desc);
357 : }
358 : }
359 : }
360 :
361 52 : if (remoteslot)
362 : {
363 : Bitmapset *modifiedCols;
364 :
365 : /*
366 : * Although logical replication doesn't maintain the bitmap for the
367 : * columns being inserted, we still use it to create 'modifiedCols'
368 : * for consistency with other calls to ExecBuildSlotValueDescription.
369 : *
370 : * Note that generated columns are formed locally on the subscriber.
371 : */
372 30 : modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
373 30 : ExecGetUpdatedCols(relinfo, estate));
374 30 : desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
375 : modifiedCols, 64);
376 :
377 30 : if (desc)
378 : {
379 30 : if (tuple_value.len > 0)
380 : {
381 20 : appendStringInfoString(&tuple_value, "; ");
382 20 : appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
383 : }
384 : else
385 : {
386 10 : appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
387 : }
388 : }
389 : }
390 :
391 52 : if (searchslot)
392 : {
393 : /*
394 : * Note that while index other than replica identity may be used (see
395 : * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
396 : * when applying update or delete, such an index scan may not result
397 : * in a unique tuple and we still compare the complete tuple in such
398 : * cases, thus such indexes are not used here.
399 : */
400 40 : Oid replica_index = GetRelationIdentityOrPK(localrel);
401 :
402 : Assert(type != CT_INSERT_EXISTS);
403 :
404 : /*
405 : * If the table has a valid replica identity index, build the index
406 : * key value string. Otherwise, construct the full tuple value for
407 : * REPLICA IDENTITY FULL cases.
408 : */
409 40 : if (OidIsValid(replica_index))
410 36 : desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
411 : else
412 4 : desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
413 :
414 40 : if (desc)
415 : {
416 40 : if (tuple_value.len > 0)
417 : {
418 22 : appendStringInfoString(&tuple_value, "; ");
419 44 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
420 18 : ? _("replica identity %s")
421 4 : : _("replica identity full %s"), desc);
422 : }
423 : else
424 : {
425 36 : appendStringInfo(&tuple_value, OidIsValid(replica_index)
426 18 : ? _("Replica identity %s")
427 0 : : _("Replica identity full %s"), desc);
428 : }
429 : }
430 : }
431 :
432 52 : if (tuple_value.len == 0)
433 0 : return NULL;
434 :
435 52 : appendStringInfoChar(&tuple_value, '.');
436 52 : return tuple_value.data;
437 : }
438 :
439 : /*
440 : * Helper functions to construct a string describing the contents of an index
441 : * entry. See BuildIndexValueDescription for details.
442 : *
443 : * The caller must ensure that the index with the OID 'indexoid' is locked so
444 : * that we can fetch and display the conflicting key value.
445 : */
446 : static char *
447 50 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
448 : Oid indexoid)
449 : {
450 : char *index_value;
451 : Relation indexDesc;
452 : Datum values[INDEX_MAX_KEYS];
453 : bool isnull[INDEX_MAX_KEYS];
454 50 : TupleTableSlot *tableslot = slot;
455 :
456 50 : if (!tableslot)
457 0 : return NULL;
458 :
459 : Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
460 :
461 50 : indexDesc = index_open(indexoid, NoLock);
462 :
463 : /*
464 : * If the slot is a virtual slot, copy it into a heap tuple slot as
465 : * FormIndexDatum only works with heap tuple slots.
466 : */
467 50 : if (TTS_IS_VIRTUAL(slot))
468 : {
469 24 : tableslot = table_slot_create(localrel, &estate->es_tupleTable);
470 24 : tableslot = ExecCopySlot(tableslot, slot);
471 : }
472 :
473 : /*
474 : * Initialize ecxt_scantuple for potential use in FormIndexDatum when
475 : * index expressions are present.
476 : */
477 50 : GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
478 :
479 : /*
480 : * The values/nulls arrays passed to BuildIndexValueDescription should be
481 : * the results of FormIndexDatum, which are the "raw" input to the index
482 : * AM.
483 : */
484 50 : FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
485 :
486 50 : index_value = BuildIndexValueDescription(indexDesc, values, isnull);
487 :
488 50 : index_close(indexDesc, NoLock);
489 :
490 50 : return index_value;
491 : }
|