Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_constraint.c
4 : * routines to support manipulation of the pg_constraint relation
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/catalog/pg_constraint.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/gist.h"
19 : #include "access/htup_details.h"
20 : #include "access/sysattr.h"
21 : #include "access/table.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/heap.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/pg_constraint.h"
28 : #include "catalog/pg_operator.h"
29 : #include "catalog/pg_type.h"
30 : #include "commands/defrem.h"
31 : #include "common/int.h"
32 : #include "utils/array.h"
33 : #include "utils/builtins.h"
34 : #include "utils/fmgroids.h"
35 : #include "utils/lsyscache.h"
36 : #include "utils/rel.h"
37 : #include "utils/syscache.h"
38 :
39 :
40 : /*
41 : * CreateConstraintEntry
42 : * Create a constraint table entry.
43 : *
44 : * Subsidiary records (such as triggers or indexes to implement the
45 : * constraint) are *not* created here. But we do make dependency links
46 : * from the constraint to the things it depends on.
47 : *
48 : * The new constraint's OID is returned.
49 : */
50 : Oid
51 53334 : CreateConstraintEntry(const char *constraintName,
52 : Oid constraintNamespace,
53 : char constraintType,
54 : bool isDeferrable,
55 : bool isDeferred,
56 : bool isEnforced,
57 : bool isValidated,
58 : Oid parentConstrId,
59 : Oid relId,
60 : const int16 *constraintKey,
61 : int constraintNKeys,
62 : int constraintNTotalKeys,
63 : Oid domainId,
64 : Oid indexRelId,
65 : Oid foreignRelId,
66 : const int16 *foreignKey,
67 : const Oid *pfEqOp,
68 : const Oid *ppEqOp,
69 : const Oid *ffEqOp,
70 : int foreignNKeys,
71 : char foreignUpdateType,
72 : char foreignDeleteType,
73 : const int16 *fkDeleteSetCols,
74 : int numFkDeleteSetCols,
75 : char foreignMatchType,
76 : const Oid *exclOp,
77 : Node *conExpr,
78 : const char *conBin,
79 : bool conIsLocal,
80 : int16 conInhCount,
81 : bool conNoInherit,
82 : bool conPeriod,
83 : bool is_internal)
84 : {
85 : Relation conDesc;
86 : Oid conOid;
87 : HeapTuple tup;
88 : bool nulls[Natts_pg_constraint];
89 : Datum values[Natts_pg_constraint];
90 : ArrayType *conkeyArray;
91 : ArrayType *confkeyArray;
92 : ArrayType *conpfeqopArray;
93 : ArrayType *conppeqopArray;
94 : ArrayType *conffeqopArray;
95 : ArrayType *conexclopArray;
96 : ArrayType *confdelsetcolsArray;
97 : NameData cname;
98 : int i;
99 : ObjectAddress conobject;
100 : ObjectAddresses *addrs_auto;
101 : ObjectAddresses *addrs_normal;
102 :
103 : /* Only CHECK or FOREIGN KEY constraint can be not enforced */
104 : Assert(isEnforced || constraintType == CONSTRAINT_CHECK ||
105 : constraintType == CONSTRAINT_FOREIGN);
106 : /* NOT ENFORCED constraint must be NOT VALID */
107 : Assert(isEnforced || !isValidated);
108 :
109 53334 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
110 :
111 : Assert(constraintName);
112 53334 : namestrcpy(&cname, constraintName);
113 :
114 : /*
115 : * Convert C arrays into Postgres arrays.
116 : */
117 53334 : if (constraintNKeys > 0)
118 : {
119 : Datum *conkey;
120 :
121 52314 : conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
122 114602 : for (i = 0; i < constraintNKeys; i++)
123 62288 : conkey[i] = Int16GetDatum(constraintKey[i]);
124 52314 : conkeyArray = construct_array_builtin(conkey, constraintNKeys, INT2OID);
125 : }
126 : else
127 1020 : conkeyArray = NULL;
128 :
129 53334 : if (foreignNKeys > 0)
130 : {
131 : Datum *fkdatums;
132 3930 : int nkeys = Max(foreignNKeys, numFkDeleteSetCols);
133 :
134 3930 : fkdatums = (Datum *) palloc(nkeys * sizeof(Datum));
135 9000 : for (i = 0; i < foreignNKeys; i++)
136 5070 : fkdatums[i] = Int16GetDatum(foreignKey[i]);
137 3930 : confkeyArray = construct_array_builtin(fkdatums, foreignNKeys, INT2OID);
138 9000 : for (i = 0; i < foreignNKeys; i++)
139 5070 : fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
140 3930 : conpfeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
141 9000 : for (i = 0; i < foreignNKeys; i++)
142 5070 : fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
143 3930 : conppeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
144 9000 : for (i = 0; i < foreignNKeys; i++)
145 5070 : fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
146 3930 : conffeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
147 :
148 3930 : if (numFkDeleteSetCols > 0)
149 : {
150 120 : for (i = 0; i < numFkDeleteSetCols; i++)
151 60 : fkdatums[i] = Int16GetDatum(fkDeleteSetCols[i]);
152 60 : confdelsetcolsArray = construct_array_builtin(fkdatums, numFkDeleteSetCols, INT2OID);
153 : }
154 : else
155 3870 : confdelsetcolsArray = NULL;
156 : }
157 : else
158 : {
159 49404 : confkeyArray = NULL;
160 49404 : conpfeqopArray = NULL;
161 49404 : conppeqopArray = NULL;
162 49404 : conffeqopArray = NULL;
163 49404 : confdelsetcolsArray = NULL;
164 : }
165 :
166 53334 : if (exclOp != NULL)
167 : {
168 : Datum *opdatums;
169 :
170 816 : opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
171 2382 : for (i = 0; i < constraintNKeys; i++)
172 1566 : opdatums[i] = ObjectIdGetDatum(exclOp[i]);
173 816 : conexclopArray = construct_array_builtin(opdatums, constraintNKeys, OIDOID);
174 : }
175 : else
176 52518 : conexclopArray = NULL;
177 :
178 : /* initialize nulls and values */
179 1546686 : for (i = 0; i < Natts_pg_constraint; i++)
180 : {
181 1493352 : nulls[i] = false;
182 1493352 : values[i] = (Datum) NULL;
183 : }
184 :
185 53334 : conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
186 : Anum_pg_constraint_oid);
187 53334 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
188 53334 : values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
189 53334 : values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
190 53334 : values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
191 53334 : values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
192 53334 : values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
193 53334 : values[Anum_pg_constraint_conenforced - 1] = BoolGetDatum(isEnforced);
194 53334 : values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
195 53334 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
196 53334 : values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
197 53334 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
198 53334 : values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
199 53334 : values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
200 53334 : values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
201 53334 : values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
202 53334 : values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
203 53334 : values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
204 53334 : values[Anum_pg_constraint_coninhcount - 1] = Int16GetDatum(conInhCount);
205 53334 : values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
206 53334 : values[Anum_pg_constraint_conperiod - 1] = BoolGetDatum(conPeriod);
207 :
208 53334 : if (conkeyArray)
209 52314 : values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
210 : else
211 1020 : nulls[Anum_pg_constraint_conkey - 1] = true;
212 :
213 53334 : if (confkeyArray)
214 3930 : values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
215 : else
216 49404 : nulls[Anum_pg_constraint_confkey - 1] = true;
217 :
218 53334 : if (conpfeqopArray)
219 3930 : values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
220 : else
221 49404 : nulls[Anum_pg_constraint_conpfeqop - 1] = true;
222 :
223 53334 : if (conppeqopArray)
224 3930 : values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
225 : else
226 49404 : nulls[Anum_pg_constraint_conppeqop - 1] = true;
227 :
228 53334 : if (conffeqopArray)
229 3930 : values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
230 : else
231 49404 : nulls[Anum_pg_constraint_conffeqop - 1] = true;
232 :
233 53334 : if (confdelsetcolsArray)
234 60 : values[Anum_pg_constraint_confdelsetcols - 1] = PointerGetDatum(confdelsetcolsArray);
235 : else
236 53274 : nulls[Anum_pg_constraint_confdelsetcols - 1] = true;
237 :
238 53334 : if (conexclopArray)
239 816 : values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
240 : else
241 52518 : nulls[Anum_pg_constraint_conexclop - 1] = true;
242 :
243 53334 : if (conBin)
244 3484 : values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
245 : else
246 49850 : nulls[Anum_pg_constraint_conbin - 1] = true;
247 :
248 53334 : tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
249 :
250 53334 : CatalogTupleInsert(conDesc, tup);
251 :
252 53334 : ObjectAddressSet(conobject, ConstraintRelationId, conOid);
253 :
254 53334 : table_close(conDesc, RowExclusiveLock);
255 :
256 : /* Handle set of auto dependencies */
257 53334 : addrs_auto = new_object_addresses();
258 :
259 53334 : if (OidIsValid(relId))
260 : {
261 : /*
262 : * Register auto dependency from constraint to owning relation, or to
263 : * specific column(s) if any are mentioned.
264 : */
265 : ObjectAddress relobject;
266 :
267 52488 : if (constraintNTotalKeys > 0)
268 : {
269 114926 : for (i = 0; i < constraintNTotalKeys; i++)
270 : {
271 62612 : ObjectAddressSubSet(relobject, RelationRelationId, relId,
272 : constraintKey[i]);
273 62612 : add_exact_object_address(&relobject, addrs_auto);
274 : }
275 : }
276 : else
277 : {
278 174 : ObjectAddressSet(relobject, RelationRelationId, relId);
279 174 : add_exact_object_address(&relobject, addrs_auto);
280 : }
281 : }
282 :
283 53334 : if (OidIsValid(domainId))
284 : {
285 : /*
286 : * Register auto dependency from constraint to owning domain
287 : */
288 : ObjectAddress domobject;
289 :
290 846 : ObjectAddressSet(domobject, TypeRelationId, domainId);
291 846 : add_exact_object_address(&domobject, addrs_auto);
292 : }
293 :
294 53334 : record_object_address_dependencies(&conobject, addrs_auto,
295 : DEPENDENCY_AUTO);
296 53334 : free_object_addresses(addrs_auto);
297 :
298 : /* Handle set of normal dependencies */
299 53334 : addrs_normal = new_object_addresses();
300 :
301 53334 : if (OidIsValid(foreignRelId))
302 : {
303 : /*
304 : * Register normal dependency from constraint to foreign relation, or
305 : * to specific column(s) if any are mentioned.
306 : */
307 : ObjectAddress relobject;
308 :
309 3930 : if (foreignNKeys > 0)
310 : {
311 9000 : for (i = 0; i < foreignNKeys; i++)
312 : {
313 5070 : ObjectAddressSubSet(relobject, RelationRelationId,
314 : foreignRelId, foreignKey[i]);
315 5070 : add_exact_object_address(&relobject, addrs_normal);
316 : }
317 : }
318 : else
319 : {
320 0 : ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
321 0 : add_exact_object_address(&relobject, addrs_normal);
322 : }
323 : }
324 :
325 53334 : if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
326 : {
327 : /*
328 : * Register normal dependency on the unique index that supports a
329 : * foreign-key constraint. (Note: for indexes associated with unique
330 : * or primary-key constraints, the dependency runs the other way, and
331 : * is not made here.)
332 : */
333 : ObjectAddress relobject;
334 :
335 3930 : ObjectAddressSet(relobject, RelationRelationId, indexRelId);
336 3930 : add_exact_object_address(&relobject, addrs_normal);
337 : }
338 :
339 53334 : if (foreignNKeys > 0)
340 : {
341 : /*
342 : * Register normal dependencies on the equality operators that support
343 : * a foreign-key constraint. If the PK and FK types are the same then
344 : * all three operators for a column are the same; otherwise they are
345 : * different.
346 : */
347 : ObjectAddress oprobject;
348 :
349 3930 : oprobject.classId = OperatorRelationId;
350 3930 : oprobject.objectSubId = 0;
351 :
352 9000 : for (i = 0; i < foreignNKeys; i++)
353 : {
354 5070 : oprobject.objectId = pfEqOp[i];
355 5070 : add_exact_object_address(&oprobject, addrs_normal);
356 5070 : if (ppEqOp[i] != pfEqOp[i])
357 : {
358 56 : oprobject.objectId = ppEqOp[i];
359 56 : add_exact_object_address(&oprobject, addrs_normal);
360 : }
361 5070 : if (ffEqOp[i] != pfEqOp[i])
362 : {
363 56 : oprobject.objectId = ffEqOp[i];
364 56 : add_exact_object_address(&oprobject, addrs_normal);
365 : }
366 : }
367 : }
368 :
369 53334 : record_object_address_dependencies(&conobject, addrs_normal,
370 : DEPENDENCY_NORMAL);
371 53334 : free_object_addresses(addrs_normal);
372 :
373 : /*
374 : * We don't bother to register dependencies on the exclusion operators of
375 : * an exclusion constraint. We assume they are members of the opclass
376 : * supporting the index, so there's an indirect dependency via that. (This
377 : * would be pretty dicey for cross-type operators, but exclusion operators
378 : * can never be cross-type.)
379 : */
380 :
381 53334 : if (conExpr != NULL)
382 : {
383 : /*
384 : * Register dependencies from constraint to objects mentioned in CHECK
385 : * expression.
386 : */
387 3484 : recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
388 : DEPENDENCY_NORMAL,
389 : DEPENDENCY_NORMAL, false);
390 : }
391 :
392 : /* Post creation hook for new constraint */
393 53334 : InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
394 : is_internal);
395 :
396 53334 : return conOid;
397 : }
398 :
399 : /*
400 : * Test whether given name is currently used as a constraint name
401 : * for the given object (relation or domain).
402 : *
403 : * This is used to decide whether to accept a user-specified constraint name.
404 : * It is deliberately not the same test as ChooseConstraintName uses to decide
405 : * whether an auto-generated name is OK: here, we will allow it unless there
406 : * is an identical constraint name in use *on the same object*.
407 : *
408 : * NB: Caller should hold exclusive lock on the given object, else
409 : * this test can be fooled by concurrent additions.
410 : */
411 : bool
412 16904 : ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
413 : const char *conname)
414 : {
415 : bool found;
416 : Relation conDesc;
417 : SysScanDesc conscan;
418 : ScanKeyData skey[3];
419 :
420 16904 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
421 :
422 16904 : ScanKeyInit(&skey[0],
423 : Anum_pg_constraint_conrelid,
424 : BTEqualStrategyNumber, F_OIDEQ,
425 : ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
426 : ? objId : InvalidOid));
427 16904 : ScanKeyInit(&skey[1],
428 : Anum_pg_constraint_contypid,
429 : BTEqualStrategyNumber, F_OIDEQ,
430 : ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
431 : ? objId : InvalidOid));
432 16904 : ScanKeyInit(&skey[2],
433 : Anum_pg_constraint_conname,
434 : BTEqualStrategyNumber, F_NAMEEQ,
435 : CStringGetDatum(conname));
436 :
437 16904 : conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
438 : true, NULL, 3, skey);
439 :
440 : /* There can be at most one matching row */
441 16904 : found = (HeapTupleIsValid(systable_getnext(conscan)));
442 :
443 16904 : systable_endscan(conscan);
444 16904 : table_close(conDesc, AccessShareLock);
445 :
446 16904 : return found;
447 : }
448 :
449 : /*
450 : * Does any constraint of the given name exist in the given namespace?
451 : *
452 : * This is used for code that wants to match ChooseConstraintName's rule
453 : * that we should avoid autogenerating duplicate constraint names within a
454 : * namespace.
455 : */
456 : bool
457 8658 : ConstraintNameExists(const char *conname, Oid namespaceid)
458 : {
459 : bool found;
460 : Relation conDesc;
461 : SysScanDesc conscan;
462 : ScanKeyData skey[2];
463 :
464 8658 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
465 :
466 8658 : ScanKeyInit(&skey[0],
467 : Anum_pg_constraint_conname,
468 : BTEqualStrategyNumber, F_NAMEEQ,
469 : CStringGetDatum(conname));
470 :
471 8658 : ScanKeyInit(&skey[1],
472 : Anum_pg_constraint_connamespace,
473 : BTEqualStrategyNumber, F_OIDEQ,
474 : ObjectIdGetDatum(namespaceid));
475 :
476 8658 : conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
477 : NULL, 2, skey);
478 :
479 8658 : found = (HeapTupleIsValid(systable_getnext(conscan)));
480 :
481 8658 : systable_endscan(conscan);
482 8658 : table_close(conDesc, AccessShareLock);
483 :
484 8658 : return found;
485 : }
486 :
487 : /*
488 : * Select a nonconflicting name for a new constraint.
489 : *
490 : * The objective here is to choose a name that is unique within the
491 : * specified namespace. Postgres does not require this, but the SQL
492 : * spec does, and some apps depend on it. Therefore we avoid choosing
493 : * default names that so conflict.
494 : *
495 : * name1, name2, and label are used the same way as for makeObjectName(),
496 : * except that the label can't be NULL; digits will be appended to the label
497 : * if needed to create a name that is unique within the specified namespace.
498 : * If the given label is empty, we only consider names that include at least
499 : * one added digit.
500 : *
501 : * 'others' can be a list of string names already chosen within the current
502 : * command (but not yet reflected into the catalogs); we will not choose
503 : * a duplicate of one of these either.
504 : *
505 : * Note: it is theoretically possible to get a collision anyway, if someone
506 : * else chooses the same name concurrently. This is fairly unlikely to be
507 : * a problem in practice, especially if one is holding an exclusive lock on
508 : * the relation identified by name1.
509 : *
510 : * Returns a palloc'd string.
511 : */
512 : char *
513 25088 : ChooseConstraintName(const char *name1, const char *name2,
514 : const char *label, Oid namespaceid,
515 : List *others)
516 : {
517 25088 : int pass = 0;
518 25088 : char *conname = NULL;
519 : char modlabel[NAMEDATALEN];
520 : Relation conDesc;
521 : SysScanDesc conscan;
522 : ScanKeyData skey[2];
523 : bool found;
524 : ListCell *l;
525 :
526 25088 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
527 :
528 : /* try the unmodified label first, unless it's empty */
529 25088 : if (label[0] != '\0')
530 24060 : strlcpy(modlabel, label, sizeof(modlabel));
531 : else
532 1028 : snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
533 :
534 : for (;;)
535 : {
536 27822 : conname = makeObjectName(name1, name2, modlabel);
537 :
538 27822 : found = false;
539 :
540 31962 : foreach(l, others)
541 : {
542 4158 : if (strcmp((char *) lfirst(l), conname) == 0)
543 : {
544 18 : found = true;
545 18 : break;
546 : }
547 : }
548 :
549 27822 : if (!found)
550 : {
551 27804 : ScanKeyInit(&skey[0],
552 : Anum_pg_constraint_conname,
553 : BTEqualStrategyNumber, F_NAMEEQ,
554 : CStringGetDatum(conname));
555 :
556 27804 : ScanKeyInit(&skey[1],
557 : Anum_pg_constraint_connamespace,
558 : BTEqualStrategyNumber, F_OIDEQ,
559 : ObjectIdGetDatum(namespaceid));
560 :
561 27804 : conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
562 : NULL, 2, skey);
563 :
564 27804 : found = (HeapTupleIsValid(systable_getnext(conscan)));
565 :
566 27804 : systable_endscan(conscan);
567 : }
568 :
569 27822 : if (!found)
570 25088 : break;
571 :
572 : /* found a conflict, so try a new name component */
573 2734 : pfree(conname);
574 2734 : snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
575 : }
576 :
577 25088 : table_close(conDesc, AccessShareLock);
578 :
579 25088 : return conname;
580 : }
581 :
582 : /*
583 : * Find and return a copy of the pg_constraint tuple that implements a
584 : * (possibly not valid) not-null constraint for the given column of the
585 : * given relation. If no such constraint exists, return NULL.
586 : *
587 : * XXX This would be easier if we had pg_attribute.notnullconstr with the OID
588 : * of the constraint that implements the not-null constraint for that column.
589 : * I'm not sure it's worth the catalog bloat and de-normalization, however.
590 : */
591 : HeapTuple
592 12322 : findNotNullConstraintAttnum(Oid relid, AttrNumber attnum)
593 : {
594 : Relation pg_constraint;
595 : HeapTuple conTup,
596 12322 : retval = NULL;
597 : SysScanDesc scan;
598 : ScanKeyData key;
599 :
600 12322 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
601 12322 : ScanKeyInit(&key,
602 : Anum_pg_constraint_conrelid,
603 : BTEqualStrategyNumber, F_OIDEQ,
604 : ObjectIdGetDatum(relid));
605 12322 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
606 : true, NULL, 1, &key);
607 :
608 19742 : while (HeapTupleIsValid(conTup = systable_getnext(scan)))
609 : {
610 9074 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(conTup);
611 : AttrNumber conkey;
612 :
613 : /*
614 : * We're looking for a NOTNULL constraint with the column we're
615 : * looking for as the sole element in conkey.
616 : */
617 9074 : if (con->contype != CONSTRAINT_NOTNULL)
618 3306 : continue;
619 :
620 5768 : conkey = extractNotNullColumn(conTup);
621 5768 : if (conkey != attnum)
622 4114 : continue;
623 :
624 : /* Found it */
625 1654 : retval = heap_copytuple(conTup);
626 1654 : break;
627 : }
628 :
629 12322 : systable_endscan(scan);
630 12322 : table_close(pg_constraint, AccessShareLock);
631 :
632 12322 : return retval;
633 : }
634 :
635 : /*
636 : * Find and return a copy of the pg_constraint tuple that implements a
637 : * (possibly not valid) not-null constraint for the given column of the
638 : * given relation.
639 : * If no such column or no such constraint exists, return NULL.
640 : */
641 : HeapTuple
642 1608 : findNotNullConstraint(Oid relid, const char *colname)
643 : {
644 : AttrNumber attnum;
645 :
646 1608 : attnum = get_attnum(relid, colname);
647 1608 : if (attnum <= InvalidAttrNumber)
648 36 : return NULL;
649 :
650 1572 : return findNotNullConstraintAttnum(relid, attnum);
651 : }
652 :
653 : /*
654 : * Find and return the pg_constraint tuple that implements a validated
655 : * not-null constraint for the given domain.
656 : */
657 : HeapTuple
658 12 : findDomainNotNullConstraint(Oid typid)
659 : {
660 : Relation pg_constraint;
661 : HeapTuple conTup,
662 12 : retval = NULL;
663 : SysScanDesc scan;
664 : ScanKeyData key;
665 :
666 12 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
667 12 : ScanKeyInit(&key,
668 : Anum_pg_constraint_contypid,
669 : BTEqualStrategyNumber, F_OIDEQ,
670 : ObjectIdGetDatum(typid));
671 12 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
672 : true, NULL, 1, &key);
673 :
674 12 : while (HeapTupleIsValid(conTup = systable_getnext(scan)))
675 : {
676 12 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(conTup);
677 :
678 : /*
679 : * We're looking for a NOTNULL constraint that's marked validated.
680 : */
681 12 : if (con->contype != CONSTRAINT_NOTNULL)
682 0 : continue;
683 12 : if (!con->convalidated)
684 0 : continue;
685 :
686 : /* Found it */
687 12 : retval = heap_copytuple(conTup);
688 12 : break;
689 : }
690 :
691 12 : systable_endscan(scan);
692 12 : table_close(pg_constraint, AccessShareLock);
693 :
694 12 : return retval;
695 : }
696 :
697 : /*
698 : * Given a pg_constraint tuple for a not-null constraint, return the column
699 : * number it is for.
700 : */
701 : AttrNumber
702 12846 : extractNotNullColumn(HeapTuple constrTup)
703 : {
704 : Datum adatum;
705 : ArrayType *arr;
706 :
707 : /* only tuples for not-null constraints should be given */
708 : Assert(((Form_pg_constraint) GETSTRUCT(constrTup))->contype == CONSTRAINT_NOTNULL);
709 :
710 12846 : adatum = SysCacheGetAttrNotNull(CONSTROID, constrTup,
711 : Anum_pg_constraint_conkey);
712 12846 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
713 12846 : if (ARR_NDIM(arr) != 1 ||
714 12846 : ARR_HASNULL(arr) ||
715 12846 : ARR_ELEMTYPE(arr) != INT2OID ||
716 12846 : ARR_DIMS(arr)[0] != 1)
717 0 : elog(ERROR, "conkey is not a 1-D smallint array");
718 :
719 : /* We leak the detoasted datum, but we don't care */
720 :
721 12846 : return ((AttrNumber *) ARR_DATA_PTR(arr))[0];
722 : }
723 :
724 : /*
725 : * AdjustNotNullInheritance
726 : * Adjust inheritance status for a single not-null constraint
727 : *
728 : * If no not-null constraint is found for the column, return false.
729 : * Caller can create one.
730 : *
731 : * If a constraint exists but the connoinherit flag is not what the caller
732 : * wants, throw an error about the incompatibility. If the desired
733 : * constraint is valid but the existing constraint is not valid, also
734 : * throw an error about that (the opposite case is acceptable).
735 : *
736 : * If everything checks out, we adjust conislocal/coninhcount and return
737 : * true. If is_local is true we flip conislocal true, or do nothing if
738 : * it's already true; otherwise we increment coninhcount by 1.
739 : */
740 : bool
741 9794 : AdjustNotNullInheritance(Oid relid, AttrNumber attnum,
742 : bool is_local, bool is_no_inherit, bool is_notvalid)
743 : {
744 : HeapTuple tup;
745 :
746 9794 : tup = findNotNullConstraintAttnum(relid, attnum);
747 9794 : if (HeapTupleIsValid(tup))
748 : {
749 : Relation pg_constraint;
750 : Form_pg_constraint conform;
751 136 : bool changed = false;
752 :
753 136 : pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
754 136 : conform = (Form_pg_constraint) GETSTRUCT(tup);
755 :
756 : /*
757 : * If the NO INHERIT flag we're asked for doesn't match what the
758 : * existing constraint has, throw an error.
759 : */
760 136 : if (is_no_inherit != conform->connoinherit)
761 30 : ereport(ERROR,
762 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
763 : errmsg("cannot change NO INHERIT status of NOT NULL constraint \"%s\" on relation \"%s\"",
764 : NameStr(conform->conname), get_rel_name(relid)),
765 : errhint("You might need to make the existing constraint inheritable using %s.",
766 : "ALTER TABLE ... ALTER CONSTRAINT ... INHERIT"));
767 :
768 : /*
769 : * Throw an error if the existing constraint is NOT VALID and caller
770 : * wants a valid one.
771 : */
772 106 : if (!is_notvalid && !conform->convalidated)
773 12 : ereport(ERROR,
774 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
775 : errmsg("incompatible NOT VALID constraint \"%s\" on relation \"%s\"",
776 : NameStr(conform->conname), get_rel_name(relid)),
777 : errhint("You might need to validate it using %s.",
778 : "ALTER TABLE ... VALIDATE CONSTRAINT"));
779 :
780 94 : if (!is_local)
781 : {
782 70 : if (pg_add_s16_overflow(conform->coninhcount, 1,
783 : &conform->coninhcount))
784 0 : ereport(ERROR,
785 : errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
786 : errmsg("too many inheritance parents"));
787 70 : changed = true;
788 : }
789 24 : else if (!conform->conislocal)
790 : {
791 0 : conform->conislocal = true;
792 0 : changed = true;
793 : }
794 :
795 94 : if (changed)
796 70 : CatalogTupleUpdate(pg_constraint, &tup->t_self, tup);
797 :
798 94 : table_close(pg_constraint, RowExclusiveLock);
799 :
800 94 : return true;
801 : }
802 :
803 9658 : return false;
804 : }
805 :
806 : /*
807 : * RelationGetNotNullConstraints
808 : * Return the list of not-null constraints for the given rel
809 : *
810 : * Caller can request cooked constraints, or raw.
811 : *
812 : * This is seldom needed, so we just scan pg_constraint each time.
813 : *
814 : * 'include_noinh' determines whether to include NO INHERIT constraints or not.
815 : */
816 : List *
817 10720 : RelationGetNotNullConstraints(Oid relid, bool cooked, bool include_noinh)
818 : {
819 10720 : List *notnulls = NIL;
820 : Relation constrRel;
821 : HeapTuple htup;
822 : SysScanDesc conscan;
823 : ScanKeyData skey;
824 :
825 10720 : constrRel = table_open(ConstraintRelationId, AccessShareLock);
826 10720 : ScanKeyInit(&skey,
827 : Anum_pg_constraint_conrelid,
828 : BTEqualStrategyNumber, F_OIDEQ,
829 : ObjectIdGetDatum(relid));
830 10720 : conscan = systable_beginscan(constrRel, ConstraintRelidTypidNameIndexId, true,
831 : NULL, 1, &skey);
832 :
833 16808 : while (HeapTupleIsValid(htup = systable_getnext(conscan)))
834 : {
835 6088 : Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(htup);
836 : AttrNumber colnum;
837 :
838 6088 : if (conForm->contype != CONSTRAINT_NOTNULL)
839 3264 : continue;
840 2824 : if (conForm->connoinherit && !include_noinh)
841 60 : continue;
842 :
843 2764 : colnum = extractNotNullColumn(htup);
844 :
845 2764 : if (cooked)
846 : {
847 : CookedConstraint *cooked;
848 :
849 2372 : cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
850 :
851 2372 : cooked->contype = CONSTR_NOTNULL;
852 2372 : cooked->conoid = conForm->oid;
853 2372 : cooked->name = pstrdup(NameStr(conForm->conname));
854 2372 : cooked->attnum = colnum;
855 2372 : cooked->expr = NULL;
856 2372 : cooked->is_enforced = true;
857 2372 : cooked->skip_validation = !conForm->convalidated;
858 2372 : cooked->is_local = true;
859 2372 : cooked->inhcount = 0;
860 2372 : cooked->is_no_inherit = conForm->connoinherit;
861 :
862 2372 : notnulls = lappend(notnulls, cooked);
863 : }
864 : else
865 : {
866 : Constraint *constr;
867 :
868 392 : constr = makeNode(Constraint);
869 392 : constr->contype = CONSTR_NOTNULL;
870 392 : constr->conname = pstrdup(NameStr(conForm->conname));
871 392 : constr->deferrable = false;
872 392 : constr->initdeferred = false;
873 392 : constr->location = -1;
874 392 : constr->keys = list_make1(makeString(get_attname(relid, colnum,
875 : false)));
876 392 : constr->is_enforced = true;
877 392 : constr->skip_validation = !conForm->convalidated;
878 392 : constr->initially_valid = true;
879 392 : constr->is_no_inherit = conForm->connoinherit;
880 392 : notnulls = lappend(notnulls, constr);
881 : }
882 : }
883 :
884 10720 : systable_endscan(conscan);
885 10720 : table_close(constrRel, AccessShareLock);
886 :
887 10720 : return notnulls;
888 : }
889 :
890 :
891 : /*
892 : * Delete a single constraint record.
893 : */
894 : void
895 26878 : RemoveConstraintById(Oid conId)
896 : {
897 : Relation conDesc;
898 : HeapTuple tup;
899 : Form_pg_constraint con;
900 :
901 26878 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
902 :
903 26878 : tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
904 26878 : if (!HeapTupleIsValid(tup)) /* should not happen */
905 0 : elog(ERROR, "cache lookup failed for constraint %u", conId);
906 26878 : con = (Form_pg_constraint) GETSTRUCT(tup);
907 :
908 : /*
909 : * Special processing depending on what the constraint is for.
910 : */
911 26878 : if (OidIsValid(con->conrelid))
912 : {
913 : Relation rel;
914 :
915 : /*
916 : * If the constraint is for a relation, open and exclusive-lock the
917 : * relation it's for.
918 : */
919 26520 : rel = table_open(con->conrelid, AccessExclusiveLock);
920 :
921 : /*
922 : * We need to update the relchecks count if it is a check constraint
923 : * being dropped. This update will force backends to rebuild relcache
924 : * entries when we commit.
925 : */
926 26518 : if (con->contype == CONSTRAINT_CHECK)
927 : {
928 : Relation pgrel;
929 : HeapTuple relTup;
930 : Form_pg_class classForm;
931 :
932 1978 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
933 1978 : relTup = SearchSysCacheCopy1(RELOID,
934 : ObjectIdGetDatum(con->conrelid));
935 1978 : if (!HeapTupleIsValid(relTup))
936 0 : elog(ERROR, "cache lookup failed for relation %u",
937 : con->conrelid);
938 1978 : classForm = (Form_pg_class) GETSTRUCT(relTup);
939 :
940 1978 : if (classForm->relchecks == 0) /* should not happen */
941 0 : elog(ERROR, "relation \"%s\" has relchecks = 0",
942 : RelationGetRelationName(rel));
943 1978 : classForm->relchecks--;
944 :
945 1978 : CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
946 :
947 1978 : heap_freetuple(relTup);
948 :
949 1978 : table_close(pgrel, RowExclusiveLock);
950 : }
951 :
952 : /* Keep lock on constraint's rel until end of xact */
953 26518 : table_close(rel, NoLock);
954 : }
955 358 : else if (OidIsValid(con->contypid))
956 : {
957 : /*
958 : * XXX for now, do nothing special when dropping a domain constraint
959 : *
960 : * Probably there should be some form of locking on the domain type,
961 : * but we have no such concept at the moment.
962 : */
963 : }
964 : else
965 0 : elog(ERROR, "constraint %u is not of a known type", conId);
966 :
967 : /* Fry the constraint itself */
968 26876 : CatalogTupleDelete(conDesc, &tup->t_self);
969 :
970 : /* Clean up */
971 26876 : ReleaseSysCache(tup);
972 26876 : table_close(conDesc, RowExclusiveLock);
973 26876 : }
974 :
975 : /*
976 : * RenameConstraintById
977 : * Rename a constraint.
978 : *
979 : * Note: this isn't intended to be a user-exposed function; it doesn't check
980 : * permissions etc. Currently this is only invoked when renaming an index
981 : * that is associated with a constraint, but it's made a little more general
982 : * than that with the expectation of someday having ALTER TABLE RENAME
983 : * CONSTRAINT.
984 : */
985 : void
986 96 : RenameConstraintById(Oid conId, const char *newname)
987 : {
988 : Relation conDesc;
989 : HeapTuple tuple;
990 : Form_pg_constraint con;
991 :
992 96 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
993 :
994 96 : tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
995 96 : if (!HeapTupleIsValid(tuple))
996 0 : elog(ERROR, "cache lookup failed for constraint %u", conId);
997 96 : con = (Form_pg_constraint) GETSTRUCT(tuple);
998 :
999 : /*
1000 : * For user-friendliness, check whether the name is already in use.
1001 : */
1002 186 : if (OidIsValid(con->conrelid) &&
1003 90 : ConstraintNameIsUsed(CONSTRAINT_RELATION,
1004 : con->conrelid,
1005 : newname))
1006 0 : ereport(ERROR,
1007 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1008 : errmsg("constraint \"%s\" for relation \"%s\" already exists",
1009 : newname, get_rel_name(con->conrelid))));
1010 102 : if (OidIsValid(con->contypid) &&
1011 6 : ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
1012 : con->contypid,
1013 : newname))
1014 0 : ereport(ERROR,
1015 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1016 : errmsg("constraint \"%s\" for domain %s already exists",
1017 : newname, format_type_be(con->contypid))));
1018 :
1019 : /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
1020 96 : namestrcpy(&(con->conname), newname);
1021 :
1022 96 : CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
1023 :
1024 96 : InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
1025 :
1026 96 : heap_freetuple(tuple);
1027 96 : table_close(conDesc, RowExclusiveLock);
1028 96 : }
1029 :
1030 : /*
1031 : * AlterConstraintNamespaces
1032 : * Find any constraints belonging to the specified object,
1033 : * and move them to the specified new namespace.
1034 : *
1035 : * isType indicates whether the owning object is a type or a relation.
1036 : */
1037 : void
1038 106 : AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
1039 : Oid newNspId, bool isType, ObjectAddresses *objsMoved)
1040 : {
1041 : Relation conRel;
1042 : ScanKeyData key[2];
1043 : SysScanDesc scan;
1044 : HeapTuple tup;
1045 :
1046 106 : conRel = table_open(ConstraintRelationId, RowExclusiveLock);
1047 :
1048 106 : ScanKeyInit(&key[0],
1049 : Anum_pg_constraint_conrelid,
1050 : BTEqualStrategyNumber, F_OIDEQ,
1051 : ObjectIdGetDatum(isType ? InvalidOid : ownerId));
1052 106 : ScanKeyInit(&key[1],
1053 : Anum_pg_constraint_contypid,
1054 : BTEqualStrategyNumber, F_OIDEQ,
1055 : ObjectIdGetDatum(isType ? ownerId : InvalidOid));
1056 :
1057 106 : scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
1058 : NULL, 2, key);
1059 :
1060 258 : while (HeapTupleIsValid((tup = systable_getnext(scan))))
1061 : {
1062 152 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
1063 : ObjectAddress thisobj;
1064 :
1065 152 : ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
1066 :
1067 152 : if (object_address_present(&thisobj, objsMoved))
1068 0 : continue;
1069 :
1070 : /* Don't update if the object is already part of the namespace */
1071 152 : if (conform->connamespace == oldNspId && oldNspId != newNspId)
1072 : {
1073 122 : tup = heap_copytuple(tup);
1074 122 : conform = (Form_pg_constraint) GETSTRUCT(tup);
1075 :
1076 122 : conform->connamespace = newNspId;
1077 :
1078 122 : CatalogTupleUpdate(conRel, &tup->t_self, tup);
1079 :
1080 : /*
1081 : * Note: currently, the constraint will not have its own
1082 : * dependency on the namespace, so we don't need to do
1083 : * changeDependencyFor().
1084 : */
1085 : }
1086 :
1087 152 : InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
1088 :
1089 152 : add_exact_object_address(&thisobj, objsMoved);
1090 : }
1091 :
1092 106 : systable_endscan(scan);
1093 :
1094 106 : table_close(conRel, RowExclusiveLock);
1095 106 : }
1096 :
1097 : /*
1098 : * ConstraintSetParentConstraint
1099 : * Set a partition's constraint as child of its parent constraint,
1100 : * or remove the linkage if parentConstrId is InvalidOid.
1101 : *
1102 : * This updates the constraint's pg_constraint row to show it as inherited, and
1103 : * adds PARTITION dependencies to prevent the constraint from being deleted
1104 : * on its own. Alternatively, reverse that.
1105 : */
1106 : void
1107 656 : ConstraintSetParentConstraint(Oid childConstrId,
1108 : Oid parentConstrId,
1109 : Oid childTableId)
1110 : {
1111 : Relation constrRel;
1112 : Form_pg_constraint constrForm;
1113 : HeapTuple tuple,
1114 : newtup;
1115 : ObjectAddress depender;
1116 : ObjectAddress referenced;
1117 :
1118 656 : constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
1119 656 : tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
1120 656 : if (!HeapTupleIsValid(tuple))
1121 0 : elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
1122 656 : newtup = heap_copytuple(tuple);
1123 656 : constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
1124 656 : if (OidIsValid(parentConstrId))
1125 : {
1126 : /* don't allow setting parent for a constraint that already has one */
1127 : Assert(constrForm->coninhcount == 0);
1128 456 : if (constrForm->conparentid != InvalidOid)
1129 0 : elog(ERROR, "constraint %u already has a parent constraint",
1130 : childConstrId);
1131 :
1132 456 : constrForm->conislocal = false;
1133 456 : if (pg_add_s16_overflow(constrForm->coninhcount, 1,
1134 : &constrForm->coninhcount))
1135 0 : ereport(ERROR,
1136 : errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1137 : errmsg("too many inheritance parents"));
1138 :
1139 456 : constrForm->conparentid = parentConstrId;
1140 :
1141 456 : CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1142 :
1143 456 : ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
1144 :
1145 456 : ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
1146 456 : recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
1147 :
1148 456 : ObjectAddressSet(referenced, RelationRelationId, childTableId);
1149 456 : recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
1150 : }
1151 : else
1152 : {
1153 200 : constrForm->coninhcount--;
1154 200 : constrForm->conislocal = true;
1155 200 : constrForm->conparentid = InvalidOid;
1156 :
1157 : /* Make sure there's no further inheritance. */
1158 : Assert(constrForm->coninhcount == 0);
1159 :
1160 200 : CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1161 :
1162 200 : deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
1163 : ConstraintRelationId,
1164 : DEPENDENCY_PARTITION_PRI);
1165 200 : deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
1166 : RelationRelationId,
1167 : DEPENDENCY_PARTITION_SEC);
1168 : }
1169 :
1170 656 : ReleaseSysCache(tuple);
1171 656 : table_close(constrRel, RowExclusiveLock);
1172 656 : }
1173 :
1174 :
1175 : /*
1176 : * get_relation_constraint_oid
1177 : * Find a constraint on the specified relation with the specified name.
1178 : * Returns constraint's OID.
1179 : */
1180 : Oid
1181 392 : get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
1182 : {
1183 : Relation pg_constraint;
1184 : HeapTuple tuple;
1185 : SysScanDesc scan;
1186 : ScanKeyData skey[3];
1187 392 : Oid conOid = InvalidOid;
1188 :
1189 392 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1190 :
1191 392 : ScanKeyInit(&skey[0],
1192 : Anum_pg_constraint_conrelid,
1193 : BTEqualStrategyNumber, F_OIDEQ,
1194 : ObjectIdGetDatum(relid));
1195 392 : ScanKeyInit(&skey[1],
1196 : Anum_pg_constraint_contypid,
1197 : BTEqualStrategyNumber, F_OIDEQ,
1198 : ObjectIdGetDatum(InvalidOid));
1199 392 : ScanKeyInit(&skey[2],
1200 : Anum_pg_constraint_conname,
1201 : BTEqualStrategyNumber, F_NAMEEQ,
1202 : CStringGetDatum(conname));
1203 :
1204 392 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1205 : NULL, 3, skey);
1206 :
1207 : /* There can be at most one matching row */
1208 392 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1209 380 : conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1210 :
1211 392 : systable_endscan(scan);
1212 :
1213 : /* If no such constraint exists, complain */
1214 392 : if (!OidIsValid(conOid) && !missing_ok)
1215 12 : ereport(ERROR,
1216 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1217 : errmsg("constraint \"%s\" for table \"%s\" does not exist",
1218 : conname, get_rel_name(relid))));
1219 :
1220 380 : table_close(pg_constraint, AccessShareLock);
1221 :
1222 380 : return conOid;
1223 : }
1224 :
1225 : /*
1226 : * get_relation_constraint_attnos
1227 : * Find a constraint on the specified relation with the specified name
1228 : * and return the constrained columns.
1229 : *
1230 : * Returns a Bitmapset of the column attnos of the constrained columns, with
1231 : * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
1232 : * columns can be represented.
1233 : *
1234 : * *constraintOid is set to the OID of the constraint, or InvalidOid on
1235 : * failure.
1236 : */
1237 : Bitmapset *
1238 192 : get_relation_constraint_attnos(Oid relid, const char *conname,
1239 : bool missing_ok, Oid *constraintOid)
1240 : {
1241 192 : Bitmapset *conattnos = NULL;
1242 : Relation pg_constraint;
1243 : HeapTuple tuple;
1244 : SysScanDesc scan;
1245 : ScanKeyData skey[3];
1246 :
1247 : /* Set *constraintOid, to avoid complaints about uninitialized vars */
1248 192 : *constraintOid = InvalidOid;
1249 :
1250 192 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1251 :
1252 192 : ScanKeyInit(&skey[0],
1253 : Anum_pg_constraint_conrelid,
1254 : BTEqualStrategyNumber, F_OIDEQ,
1255 : ObjectIdGetDatum(relid));
1256 192 : ScanKeyInit(&skey[1],
1257 : Anum_pg_constraint_contypid,
1258 : BTEqualStrategyNumber, F_OIDEQ,
1259 : ObjectIdGetDatum(InvalidOid));
1260 192 : ScanKeyInit(&skey[2],
1261 : Anum_pg_constraint_conname,
1262 : BTEqualStrategyNumber, F_NAMEEQ,
1263 : CStringGetDatum(conname));
1264 :
1265 192 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1266 : NULL, 3, skey);
1267 :
1268 : /* There can be at most one matching row */
1269 192 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1270 : {
1271 : Datum adatum;
1272 : bool isNull;
1273 :
1274 192 : *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1275 :
1276 : /* Extract the conkey array, ie, attnums of constrained columns */
1277 192 : adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1278 : RelationGetDescr(pg_constraint), &isNull);
1279 192 : if (!isNull)
1280 : {
1281 : ArrayType *arr;
1282 : int numcols;
1283 : int16 *attnums;
1284 : int i;
1285 :
1286 192 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1287 192 : numcols = ARR_DIMS(arr)[0];
1288 192 : if (ARR_NDIM(arr) != 1 ||
1289 192 : numcols < 0 ||
1290 192 : ARR_HASNULL(arr) ||
1291 192 : ARR_ELEMTYPE(arr) != INT2OID)
1292 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1293 192 : attnums = (int16 *) ARR_DATA_PTR(arr);
1294 :
1295 : /* Construct the result value */
1296 540 : for (i = 0; i < numcols; i++)
1297 : {
1298 348 : conattnos = bms_add_member(conattnos,
1299 348 : attnums[i] - FirstLowInvalidHeapAttributeNumber);
1300 : }
1301 : }
1302 : }
1303 :
1304 192 : systable_endscan(scan);
1305 :
1306 : /* If no such constraint exists, complain */
1307 192 : if (!OidIsValid(*constraintOid) && !missing_ok)
1308 0 : ereport(ERROR,
1309 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1310 : errmsg("constraint \"%s\" for table \"%s\" does not exist",
1311 : conname, get_rel_name(relid))));
1312 :
1313 192 : table_close(pg_constraint, AccessShareLock);
1314 :
1315 192 : return conattnos;
1316 : }
1317 :
1318 : /*
1319 : * Return the OID of the constraint enforced by the given index in the
1320 : * given relation; or InvalidOid if no such index is cataloged.
1321 : *
1322 : * Much like get_constraint_index, this function is concerned only with the
1323 : * one constraint that "owns" the given index. Therefore, constraints of
1324 : * types other than unique, primary-key, and exclusion are ignored.
1325 : */
1326 : Oid
1327 1494 : get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
1328 : {
1329 : Relation pg_constraint;
1330 : SysScanDesc scan;
1331 : ScanKeyData key;
1332 : HeapTuple tuple;
1333 1494 : Oid constraintId = InvalidOid;
1334 :
1335 1494 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1336 :
1337 1494 : ScanKeyInit(&key,
1338 : Anum_pg_constraint_conrelid,
1339 : BTEqualStrategyNumber,
1340 : F_OIDEQ,
1341 : ObjectIdGetDatum(relationId));
1342 1494 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
1343 : true, NULL, 1, &key);
1344 2662 : while ((tuple = systable_getnext(scan)) != NULL)
1345 : {
1346 : Form_pg_constraint constrForm;
1347 :
1348 2078 : constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1349 :
1350 : /* See above */
1351 2078 : if (constrForm->contype != CONSTRAINT_PRIMARY &&
1352 1248 : constrForm->contype != CONSTRAINT_UNIQUE &&
1353 1152 : constrForm->contype != CONSTRAINT_EXCLUSION)
1354 1152 : continue;
1355 :
1356 926 : if (constrForm->conindid == indexId)
1357 : {
1358 910 : constraintId = constrForm->oid;
1359 910 : break;
1360 : }
1361 : }
1362 1494 : systable_endscan(scan);
1363 :
1364 1494 : table_close(pg_constraint, AccessShareLock);
1365 1494 : return constraintId;
1366 : }
1367 :
1368 : /*
1369 : * get_domain_constraint_oid
1370 : * Find a constraint on the specified domain with the specified name.
1371 : * Returns constraint's OID.
1372 : */
1373 : Oid
1374 56 : get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1375 : {
1376 : Relation pg_constraint;
1377 : HeapTuple tuple;
1378 : SysScanDesc scan;
1379 : ScanKeyData skey[3];
1380 56 : Oid conOid = InvalidOid;
1381 :
1382 56 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1383 :
1384 56 : ScanKeyInit(&skey[0],
1385 : Anum_pg_constraint_conrelid,
1386 : BTEqualStrategyNumber, F_OIDEQ,
1387 : ObjectIdGetDatum(InvalidOid));
1388 56 : ScanKeyInit(&skey[1],
1389 : Anum_pg_constraint_contypid,
1390 : BTEqualStrategyNumber, F_OIDEQ,
1391 : ObjectIdGetDatum(typid));
1392 56 : ScanKeyInit(&skey[2],
1393 : Anum_pg_constraint_conname,
1394 : BTEqualStrategyNumber, F_NAMEEQ,
1395 : CStringGetDatum(conname));
1396 :
1397 56 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1398 : NULL, 3, skey);
1399 :
1400 : /* There can be at most one matching row */
1401 56 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1402 50 : conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1403 :
1404 56 : systable_endscan(scan);
1405 :
1406 : /* If no such constraint exists, complain */
1407 56 : if (!OidIsValid(conOid) && !missing_ok)
1408 6 : ereport(ERROR,
1409 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1410 : errmsg("constraint \"%s\" for domain %s does not exist",
1411 : conname, format_type_be(typid))));
1412 :
1413 50 : table_close(pg_constraint, AccessShareLock);
1414 :
1415 50 : return conOid;
1416 : }
1417 :
1418 : /*
1419 : * get_primary_key_attnos
1420 : * Identify the columns in a relation's primary key, if any.
1421 : *
1422 : * Returns a Bitmapset of the column attnos of the primary key's columns,
1423 : * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1424 : * system columns can be represented.
1425 : *
1426 : * If there is no primary key, return NULL. We also return NULL if the pkey
1427 : * constraint is deferrable and deferrableOk is false.
1428 : *
1429 : * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1430 : * on failure.
1431 : */
1432 : Bitmapset *
1433 210 : get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1434 : {
1435 210 : Bitmapset *pkattnos = NULL;
1436 : Relation pg_constraint;
1437 : HeapTuple tuple;
1438 : SysScanDesc scan;
1439 : ScanKeyData skey[1];
1440 :
1441 : /* Set *constraintOid, to avoid complaints about uninitialized vars */
1442 210 : *constraintOid = InvalidOid;
1443 :
1444 : /* Scan pg_constraint for constraints of the target rel */
1445 210 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1446 :
1447 210 : ScanKeyInit(&skey[0],
1448 : Anum_pg_constraint_conrelid,
1449 : BTEqualStrategyNumber, F_OIDEQ,
1450 : ObjectIdGetDatum(relid));
1451 :
1452 210 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1453 : NULL, 1, skey);
1454 :
1455 516 : while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1456 : {
1457 474 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1458 : Datum adatum;
1459 : bool isNull;
1460 : ArrayType *arr;
1461 : int16 *attnums;
1462 : int numkeys;
1463 : int i;
1464 :
1465 : /* Skip constraints that are not PRIMARY KEYs */
1466 474 : if (con->contype != CONSTRAINT_PRIMARY)
1467 306 : continue;
1468 :
1469 : /*
1470 : * If the primary key is deferrable, but we've been instructed to
1471 : * ignore deferrable constraints, then we might as well give up
1472 : * searching, since there can only be a single primary key on a table.
1473 : */
1474 168 : if (con->condeferrable && !deferrableOk)
1475 168 : break;
1476 :
1477 : /* Extract the conkey array, ie, attnums of PK's columns */
1478 168 : adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1479 : RelationGetDescr(pg_constraint), &isNull);
1480 168 : if (isNull)
1481 0 : elog(ERROR, "null conkey for constraint %u",
1482 : ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1483 168 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1484 168 : numkeys = ARR_DIMS(arr)[0];
1485 168 : if (ARR_NDIM(arr) != 1 ||
1486 168 : numkeys < 0 ||
1487 168 : ARR_HASNULL(arr) ||
1488 168 : ARR_ELEMTYPE(arr) != INT2OID)
1489 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1490 168 : attnums = (int16 *) ARR_DATA_PTR(arr);
1491 :
1492 : /* Construct the result value */
1493 354 : for (i = 0; i < numkeys; i++)
1494 : {
1495 186 : pkattnos = bms_add_member(pkattnos,
1496 186 : attnums[i] - FirstLowInvalidHeapAttributeNumber);
1497 : }
1498 168 : *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1499 :
1500 : /* No need to search further */
1501 168 : break;
1502 : }
1503 :
1504 210 : systable_endscan(scan);
1505 :
1506 210 : table_close(pg_constraint, AccessShareLock);
1507 :
1508 210 : return pkattnos;
1509 : }
1510 :
1511 : /*
1512 : * Extract data from the pg_constraint tuple of a foreign-key constraint.
1513 : *
1514 : * All arguments save the first are output arguments. All output arguments
1515 : * other than numfks, conkey and confkey can be passed as NULL if caller
1516 : * doesn't need them.
1517 : */
1518 : void
1519 8498 : DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1520 : AttrNumber *conkey, AttrNumber *confkey,
1521 : Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs,
1522 : int *num_fk_del_set_cols, AttrNumber *fk_del_set_cols)
1523 : {
1524 : Datum adatum;
1525 : bool isNull;
1526 : ArrayType *arr;
1527 : int numkeys;
1528 :
1529 : /*
1530 : * We expect the arrays to be 1-D arrays of the right types; verify that.
1531 : * We don't need to use deconstruct_array() since the array data is just
1532 : * going to look like a C array of values.
1533 : */
1534 8498 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1535 : Anum_pg_constraint_conkey);
1536 8498 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1537 8498 : if (ARR_NDIM(arr) != 1 ||
1538 8498 : ARR_HASNULL(arr) ||
1539 8498 : ARR_ELEMTYPE(arr) != INT2OID)
1540 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1541 8498 : numkeys = ARR_DIMS(arr)[0];
1542 8498 : if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1543 0 : elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1544 8498 : memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1545 8498 : if ((Pointer) arr != DatumGetPointer(adatum))
1546 8498 : pfree(arr); /* free de-toasted copy, if any */
1547 :
1548 8498 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1549 : Anum_pg_constraint_confkey);
1550 8498 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1551 8498 : if (ARR_NDIM(arr) != 1 ||
1552 8498 : ARR_DIMS(arr)[0] != numkeys ||
1553 8498 : ARR_HASNULL(arr) ||
1554 8498 : ARR_ELEMTYPE(arr) != INT2OID)
1555 0 : elog(ERROR, "confkey is not a 1-D smallint array");
1556 8498 : memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1557 8498 : if ((Pointer) arr != DatumGetPointer(adatum))
1558 8498 : pfree(arr); /* free de-toasted copy, if any */
1559 :
1560 8498 : if (pf_eq_oprs)
1561 : {
1562 8498 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1563 : Anum_pg_constraint_conpfeqop);
1564 8498 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1565 : /* see TryReuseForeignKey if you change the test below */
1566 8498 : if (ARR_NDIM(arr) != 1 ||
1567 8498 : ARR_DIMS(arr)[0] != numkeys ||
1568 8498 : ARR_HASNULL(arr) ||
1569 8498 : ARR_ELEMTYPE(arr) != OIDOID)
1570 0 : elog(ERROR, "conpfeqop is not a 1-D Oid array");
1571 8498 : memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1572 8498 : if ((Pointer) arr != DatumGetPointer(adatum))
1573 8498 : pfree(arr); /* free de-toasted copy, if any */
1574 : }
1575 :
1576 8498 : if (pp_eq_oprs)
1577 : {
1578 4984 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1579 : Anum_pg_constraint_conppeqop);
1580 4984 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1581 4984 : if (ARR_NDIM(arr) != 1 ||
1582 4984 : ARR_DIMS(arr)[0] != numkeys ||
1583 4984 : ARR_HASNULL(arr) ||
1584 4984 : ARR_ELEMTYPE(arr) != OIDOID)
1585 0 : elog(ERROR, "conppeqop is not a 1-D Oid array");
1586 4984 : memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1587 4984 : if ((Pointer) arr != DatumGetPointer(adatum))
1588 4984 : pfree(arr); /* free de-toasted copy, if any */
1589 : }
1590 :
1591 8498 : if (ff_eq_oprs)
1592 : {
1593 4984 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1594 : Anum_pg_constraint_conffeqop);
1595 4984 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1596 4984 : if (ARR_NDIM(arr) != 1 ||
1597 4984 : ARR_DIMS(arr)[0] != numkeys ||
1598 4984 : ARR_HASNULL(arr) ||
1599 4984 : ARR_ELEMTYPE(arr) != OIDOID)
1600 0 : elog(ERROR, "conffeqop is not a 1-D Oid array");
1601 4984 : memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1602 4984 : if ((Pointer) arr != DatumGetPointer(adatum))
1603 4984 : pfree(arr); /* free de-toasted copy, if any */
1604 : }
1605 :
1606 8498 : if (fk_del_set_cols)
1607 : {
1608 4984 : adatum = SysCacheGetAttr(CONSTROID, tuple,
1609 : Anum_pg_constraint_confdelsetcols, &isNull);
1610 4984 : if (isNull)
1611 : {
1612 4906 : *num_fk_del_set_cols = 0;
1613 : }
1614 : else
1615 : {
1616 : int num_delete_cols;
1617 :
1618 78 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1619 78 : if (ARR_NDIM(arr) != 1 ||
1620 78 : ARR_HASNULL(arr) ||
1621 78 : ARR_ELEMTYPE(arr) != INT2OID)
1622 0 : elog(ERROR, "confdelsetcols is not a 1-D smallint array");
1623 78 : num_delete_cols = ARR_DIMS(arr)[0];
1624 78 : memcpy(fk_del_set_cols, ARR_DATA_PTR(arr), num_delete_cols * sizeof(int16));
1625 78 : if ((Pointer) arr != DatumGetPointer(adatum))
1626 78 : pfree(arr); /* free de-toasted copy, if any */
1627 :
1628 78 : *num_fk_del_set_cols = num_delete_cols;
1629 : }
1630 : }
1631 :
1632 8498 : *numfks = numkeys;
1633 8498 : }
1634 :
1635 : /*
1636 : * FindFKPeriodOpers -
1637 : *
1638 : * Looks up the operator oids used for the PERIOD part of a temporal foreign key.
1639 : * The opclass should be the opclass of that PERIOD element.
1640 : * Everything else is an output: containedbyoperoid is the ContainedBy operator for
1641 : * types matching the PERIOD element.
1642 : * aggedcontainedbyoperoid is also a ContainedBy operator,
1643 : * but one whose rhs is a multirange.
1644 : * That way foreign keys can compare fkattr <@ range_agg(pkattr).
1645 : * intersectoperoid is used by NO ACTION constraints to trim the range being considered
1646 : * to just what was updated/deleted.
1647 : */
1648 : void
1649 312 : FindFKPeriodOpers(Oid opclass,
1650 : Oid *containedbyoperoid,
1651 : Oid *aggedcontainedbyoperoid,
1652 : Oid *intersectoperoid)
1653 : {
1654 312 : Oid opfamily = InvalidOid;
1655 312 : Oid opcintype = InvalidOid;
1656 : StrategyNumber strat;
1657 :
1658 : /* Make sure we have a range or multirange. */
1659 312 : if (get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1660 : {
1661 312 : if (opcintype != ANYRANGEOID && opcintype != ANYMULTIRANGEOID)
1662 0 : ereport(ERROR,
1663 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1664 : errmsg("invalid type for PERIOD part of foreign key"),
1665 : errdetail("Only range and multirange are supported."));
1666 :
1667 : }
1668 : else
1669 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
1670 :
1671 : /*
1672 : * Look up the ContainedBy operator whose lhs and rhs are the opclass's
1673 : * type. We use this to optimize RI checks: if the new value includes all
1674 : * of the old value, then we can treat the attribute as if it didn't
1675 : * change, and skip the RI check.
1676 : */
1677 312 : GetOperatorFromCompareType(opclass,
1678 : InvalidOid,
1679 : COMPARE_CONTAINED_BY,
1680 : containedbyoperoid,
1681 : &strat);
1682 :
1683 : /*
1684 : * Now look up the ContainedBy operator. Its left arg must be the type of
1685 : * the column (or rather of the opclass). Its right arg must match the
1686 : * return type of the support proc.
1687 : */
1688 312 : GetOperatorFromCompareType(opclass,
1689 : ANYMULTIRANGEOID,
1690 : COMPARE_CONTAINED_BY,
1691 : aggedcontainedbyoperoid,
1692 : &strat);
1693 :
1694 312 : switch (opcintype)
1695 : {
1696 164 : case ANYRANGEOID:
1697 164 : *intersectoperoid = OID_RANGE_INTERSECT_RANGE_OP;
1698 164 : break;
1699 148 : case ANYMULTIRANGEOID:
1700 148 : *intersectoperoid = OID_MULTIRANGE_INTERSECT_MULTIRANGE_OP;
1701 148 : break;
1702 0 : default:
1703 0 : elog(ERROR, "unexpected opcintype: %u", opcintype);
1704 : }
1705 312 : }
1706 :
1707 : /*
1708 : * Determine whether a relation can be proven functionally dependent on
1709 : * a set of grouping columns. If so, return true and add the pg_constraint
1710 : * OIDs of the constraints needed for the proof to the *constraintDeps list.
1711 : *
1712 : * grouping_columns is a list of grouping expressions, in which columns of
1713 : * the rel of interest are Vars with the indicated varno/varlevelsup.
1714 : *
1715 : * Currently we only check to see if the rel has a primary key that is a
1716 : * subset of the grouping_columns. We could also use plain unique constraints
1717 : * if all their columns are known not null, but there's a problem: we need
1718 : * to be able to represent the not-null-ness as part of the constraints added
1719 : * to *constraintDeps. FIXME whenever not-null constraints get represented
1720 : * in pg_constraint.
1721 : */
1722 : bool
1723 210 : check_functional_grouping(Oid relid,
1724 : Index varno, Index varlevelsup,
1725 : List *grouping_columns,
1726 : List **constraintDeps)
1727 : {
1728 : Bitmapset *pkattnos;
1729 : Bitmapset *groupbyattnos;
1730 : Oid constraintOid;
1731 : ListCell *gl;
1732 :
1733 : /* If the rel has no PK, then we can't prove functional dependency */
1734 210 : pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1735 210 : if (pkattnos == NULL)
1736 42 : return false;
1737 :
1738 : /* Identify all the rel's columns that appear in grouping_columns */
1739 168 : groupbyattnos = NULL;
1740 378 : foreach(gl, grouping_columns)
1741 : {
1742 210 : Var *gvar = (Var *) lfirst(gl);
1743 :
1744 210 : if (IsA(gvar, Var) &&
1745 210 : gvar->varno == varno &&
1746 168 : gvar->varlevelsup == varlevelsup)
1747 168 : groupbyattnos = bms_add_member(groupbyattnos,
1748 168 : gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1749 : }
1750 :
1751 168 : if (bms_is_subset(pkattnos, groupbyattnos))
1752 : {
1753 : /* The PK is a subset of grouping_columns, so we win */
1754 126 : *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1755 126 : return true;
1756 : }
1757 :
1758 42 : return false;
1759 : }
|