Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_constraint.c
4 : * routines to support manipulation of the pg_constraint relation
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/catalog/pg_constraint.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/gist.h"
19 : #include "access/htup_details.h"
20 : #include "access/sysattr.h"
21 : #include "access/table.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/heap.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/pg_constraint.h"
28 : #include "catalog/pg_operator.h"
29 : #include "catalog/pg_type.h"
30 : #include "commands/defrem.h"
31 : #include "utils/array.h"
32 : #include "utils/builtins.h"
33 : #include "utils/fmgroids.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 :
38 :
39 : /*
40 : * CreateConstraintEntry
41 : * Create a constraint table entry.
42 : *
43 : * Subsidiary records (such as triggers or indexes to implement the
44 : * constraint) are *not* created here. But we do make dependency links
45 : * from the constraint to the things it depends on.
46 : *
47 : * The new constraint's OID is returned.
48 : */
49 : Oid
50 32872 : CreateConstraintEntry(const char *constraintName,
51 : Oid constraintNamespace,
52 : char constraintType,
53 : bool isDeferrable,
54 : bool isDeferred,
55 : bool isValidated,
56 : Oid parentConstrId,
57 : Oid relId,
58 : const int16 *constraintKey,
59 : int constraintNKeys,
60 : int constraintNTotalKeys,
61 : Oid domainId,
62 : Oid indexRelId,
63 : Oid foreignRelId,
64 : const int16 *foreignKey,
65 : const Oid *pfEqOp,
66 : const Oid *ppEqOp,
67 : const Oid *ffEqOp,
68 : int foreignNKeys,
69 : char foreignUpdateType,
70 : char foreignDeleteType,
71 : const int16 *fkDeleteSetCols,
72 : int numFkDeleteSetCols,
73 : char foreignMatchType,
74 : const Oid *exclOp,
75 : Node *conExpr,
76 : const char *conBin,
77 : bool conIsLocal,
78 : int conInhCount,
79 : bool conNoInherit,
80 : bool conPeriod,
81 : bool is_internal)
82 : {
83 : Relation conDesc;
84 : Oid conOid;
85 : HeapTuple tup;
86 : bool nulls[Natts_pg_constraint];
87 : Datum values[Natts_pg_constraint];
88 : ArrayType *conkeyArray;
89 : ArrayType *confkeyArray;
90 : ArrayType *conpfeqopArray;
91 : ArrayType *conppeqopArray;
92 : ArrayType *conffeqopArray;
93 : ArrayType *conexclopArray;
94 : ArrayType *confdelsetcolsArray;
95 : NameData cname;
96 : int i;
97 : ObjectAddress conobject;
98 : ObjectAddresses *addrs_auto;
99 : ObjectAddresses *addrs_normal;
100 :
101 32872 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
102 :
103 : Assert(constraintName);
104 32872 : namestrcpy(&cname, constraintName);
105 :
106 : /*
107 : * Convert C arrays into Postgres arrays.
108 : */
109 32872 : if (constraintNKeys > 0)
110 : {
111 : Datum *conkey;
112 :
113 31988 : conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
114 72076 : for (i = 0; i < constraintNKeys; i++)
115 40088 : conkey[i] = Int16GetDatum(constraintKey[i]);
116 31988 : conkeyArray = construct_array_builtin(conkey, constraintNKeys, INT2OID);
117 : }
118 : else
119 884 : conkeyArray = NULL;
120 :
121 32872 : if (foreignNKeys > 0)
122 : {
123 : Datum *fkdatums;
124 :
125 3466 : fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
126 7864 : for (i = 0; i < foreignNKeys; i++)
127 4398 : fkdatums[i] = Int16GetDatum(foreignKey[i]);
128 3466 : confkeyArray = construct_array_builtin(fkdatums, foreignNKeys, INT2OID);
129 7864 : for (i = 0; i < foreignNKeys; i++)
130 4398 : fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
131 3466 : conpfeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
132 7864 : for (i = 0; i < foreignNKeys; i++)
133 4398 : fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
134 3466 : conppeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
135 7864 : for (i = 0; i < foreignNKeys; i++)
136 4398 : fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
137 3466 : conffeqopArray = construct_array_builtin(fkdatums, foreignNKeys, OIDOID);
138 :
139 3466 : if (numFkDeleteSetCols > 0)
140 : {
141 120 : for (i = 0; i < numFkDeleteSetCols; i++)
142 60 : fkdatums[i] = Int16GetDatum(fkDeleteSetCols[i]);
143 60 : confdelsetcolsArray = construct_array_builtin(fkdatums, numFkDeleteSetCols, INT2OID);
144 : }
145 : else
146 3406 : confdelsetcolsArray = NULL;
147 : }
148 : else
149 : {
150 29406 : confkeyArray = NULL;
151 29406 : conpfeqopArray = NULL;
152 29406 : conppeqopArray = NULL;
153 29406 : conffeqopArray = NULL;
154 29406 : confdelsetcolsArray = NULL;
155 : }
156 :
157 32872 : if (exclOp != NULL)
158 : {
159 : Datum *opdatums;
160 :
161 604 : opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
162 1736 : for (i = 0; i < constraintNKeys; i++)
163 1132 : opdatums[i] = ObjectIdGetDatum(exclOp[i]);
164 604 : conexclopArray = construct_array_builtin(opdatums, constraintNKeys, OIDOID);
165 : }
166 : else
167 32268 : conexclopArray = NULL;
168 :
169 : /* initialize nulls and values */
170 920416 : for (i = 0; i < Natts_pg_constraint; i++)
171 : {
172 887544 : nulls[i] = false;
173 887544 : values[i] = (Datum) NULL;
174 : }
175 :
176 32872 : conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
177 : Anum_pg_constraint_oid);
178 32872 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
179 32872 : values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
180 32872 : values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
181 32872 : values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
182 32872 : values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
183 32872 : values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
184 32872 : values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
185 32872 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
186 32872 : values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
187 32872 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
188 32872 : values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
189 32872 : values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
190 32872 : values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
191 32872 : values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
192 32872 : values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
193 32872 : values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
194 32872 : values[Anum_pg_constraint_coninhcount - 1] = Int16GetDatum(conInhCount);
195 32872 : values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
196 32872 : values[Anum_pg_constraint_conperiod - 1] = BoolGetDatum(conPeriod);
197 :
198 32872 : if (conkeyArray)
199 31988 : values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
200 : else
201 884 : nulls[Anum_pg_constraint_conkey - 1] = true;
202 :
203 32872 : if (confkeyArray)
204 3466 : values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
205 : else
206 29406 : nulls[Anum_pg_constraint_confkey - 1] = true;
207 :
208 32872 : if (conpfeqopArray)
209 3466 : values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
210 : else
211 29406 : nulls[Anum_pg_constraint_conpfeqop - 1] = true;
212 :
213 32872 : if (conppeqopArray)
214 3466 : values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
215 : else
216 29406 : nulls[Anum_pg_constraint_conppeqop - 1] = true;
217 :
218 32872 : if (conffeqopArray)
219 3466 : values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
220 : else
221 29406 : nulls[Anum_pg_constraint_conffeqop - 1] = true;
222 :
223 32872 : if (confdelsetcolsArray)
224 60 : values[Anum_pg_constraint_confdelsetcols - 1] = PointerGetDatum(confdelsetcolsArray);
225 : else
226 32812 : nulls[Anum_pg_constraint_confdelsetcols - 1] = true;
227 :
228 32872 : if (conexclopArray)
229 604 : values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
230 : else
231 32268 : nulls[Anum_pg_constraint_conexclop - 1] = true;
232 :
233 32872 : if (conBin)
234 2730 : values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
235 : else
236 30142 : nulls[Anum_pg_constraint_conbin - 1] = true;
237 :
238 32872 : tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
239 :
240 32872 : CatalogTupleInsert(conDesc, tup);
241 :
242 32872 : ObjectAddressSet(conobject, ConstraintRelationId, conOid);
243 :
244 32872 : table_close(conDesc, RowExclusiveLock);
245 :
246 : /* Handle set of auto dependencies */
247 32872 : addrs_auto = new_object_addresses();
248 :
249 32872 : if (OidIsValid(relId))
250 : {
251 : /*
252 : * Register auto dependency from constraint to owning relation, or to
253 : * specific column(s) if any are mentioned.
254 : */
255 : ObjectAddress relobject;
256 :
257 32152 : if (constraintNTotalKeys > 0)
258 : {
259 72384 : for (i = 0; i < constraintNTotalKeys; i++)
260 : {
261 40396 : ObjectAddressSubSet(relobject, RelationRelationId, relId,
262 : constraintKey[i]);
263 40396 : add_exact_object_address(&relobject, addrs_auto);
264 : }
265 : }
266 : else
267 : {
268 164 : ObjectAddressSet(relobject, RelationRelationId, relId);
269 164 : add_exact_object_address(&relobject, addrs_auto);
270 : }
271 : }
272 :
273 32872 : if (OidIsValid(domainId))
274 : {
275 : /*
276 : * Register auto dependency from constraint to owning domain
277 : */
278 : ObjectAddress domobject;
279 :
280 720 : ObjectAddressSet(domobject, TypeRelationId, domainId);
281 720 : add_exact_object_address(&domobject, addrs_auto);
282 : }
283 :
284 32872 : record_object_address_dependencies(&conobject, addrs_auto,
285 : DEPENDENCY_AUTO);
286 32872 : free_object_addresses(addrs_auto);
287 :
288 : /* Handle set of normal dependencies */
289 32872 : addrs_normal = new_object_addresses();
290 :
291 32872 : if (OidIsValid(foreignRelId))
292 : {
293 : /*
294 : * Register normal dependency from constraint to foreign relation, or
295 : * to specific column(s) if any are mentioned.
296 : */
297 : ObjectAddress relobject;
298 :
299 3466 : if (foreignNKeys > 0)
300 : {
301 7864 : for (i = 0; i < foreignNKeys; i++)
302 : {
303 4398 : ObjectAddressSubSet(relobject, RelationRelationId,
304 : foreignRelId, foreignKey[i]);
305 4398 : add_exact_object_address(&relobject, addrs_normal);
306 : }
307 : }
308 : else
309 : {
310 0 : ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
311 0 : add_exact_object_address(&relobject, addrs_normal);
312 : }
313 : }
314 :
315 32872 : if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
316 : {
317 : /*
318 : * Register normal dependency on the unique index that supports a
319 : * foreign-key constraint. (Note: for indexes associated with unique
320 : * or primary-key constraints, the dependency runs the other way, and
321 : * is not made here.)
322 : */
323 : ObjectAddress relobject;
324 :
325 3466 : ObjectAddressSet(relobject, RelationRelationId, indexRelId);
326 3466 : add_exact_object_address(&relobject, addrs_normal);
327 : }
328 :
329 32872 : if (foreignNKeys > 0)
330 : {
331 : /*
332 : * Register normal dependencies on the equality operators that support
333 : * a foreign-key constraint. If the PK and FK types are the same then
334 : * all three operators for a column are the same; otherwise they are
335 : * different.
336 : */
337 : ObjectAddress oprobject;
338 :
339 3466 : oprobject.classId = OperatorRelationId;
340 3466 : oprobject.objectSubId = 0;
341 :
342 7864 : for (i = 0; i < foreignNKeys; i++)
343 : {
344 4398 : oprobject.objectId = pfEqOp[i];
345 4398 : add_exact_object_address(&oprobject, addrs_normal);
346 4398 : if (ppEqOp[i] != pfEqOp[i])
347 : {
348 56 : oprobject.objectId = ppEqOp[i];
349 56 : add_exact_object_address(&oprobject, addrs_normal);
350 : }
351 4398 : if (ffEqOp[i] != pfEqOp[i])
352 : {
353 56 : oprobject.objectId = ffEqOp[i];
354 56 : add_exact_object_address(&oprobject, addrs_normal);
355 : }
356 : }
357 : }
358 :
359 32872 : record_object_address_dependencies(&conobject, addrs_normal,
360 : DEPENDENCY_NORMAL);
361 32872 : free_object_addresses(addrs_normal);
362 :
363 : /*
364 : * We don't bother to register dependencies on the exclusion operators of
365 : * an exclusion constraint. We assume they are members of the opclass
366 : * supporting the index, so there's an indirect dependency via that. (This
367 : * would be pretty dicey for cross-type operators, but exclusion operators
368 : * can never be cross-type.)
369 : */
370 :
371 32872 : if (conExpr != NULL)
372 : {
373 : /*
374 : * Register dependencies from constraint to objects mentioned in CHECK
375 : * expression.
376 : */
377 2730 : recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
378 : DEPENDENCY_NORMAL,
379 : DEPENDENCY_NORMAL, false);
380 : }
381 :
382 : /* Post creation hook for new constraint */
383 32872 : InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
384 : is_internal);
385 :
386 32872 : return conOid;
387 : }
388 :
389 : /*
390 : * Test whether given name is currently used as a constraint name
391 : * for the given object (relation or domain).
392 : *
393 : * This is used to decide whether to accept a user-specified constraint name.
394 : * It is deliberately not the same test as ChooseConstraintName uses to decide
395 : * whether an auto-generated name is OK: here, we will allow it unless there
396 : * is an identical constraint name in use *on the same object*.
397 : *
398 : * NB: Caller should hold exclusive lock on the given object, else
399 : * this test can be fooled by concurrent additions.
400 : */
401 : bool
402 15226 : ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
403 : const char *conname)
404 : {
405 : bool found;
406 : Relation conDesc;
407 : SysScanDesc conscan;
408 : ScanKeyData skey[3];
409 :
410 15226 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
411 :
412 15226 : ScanKeyInit(&skey[0],
413 : Anum_pg_constraint_conrelid,
414 : BTEqualStrategyNumber, F_OIDEQ,
415 : ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
416 : ? objId : InvalidOid));
417 15226 : ScanKeyInit(&skey[1],
418 : Anum_pg_constraint_contypid,
419 : BTEqualStrategyNumber, F_OIDEQ,
420 : ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
421 : ? objId : InvalidOid));
422 15226 : ScanKeyInit(&skey[2],
423 : Anum_pg_constraint_conname,
424 : BTEqualStrategyNumber, F_NAMEEQ,
425 : CStringGetDatum(conname));
426 :
427 15226 : conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
428 : true, NULL, 3, skey);
429 :
430 : /* There can be at most one matching row */
431 15226 : found = (HeapTupleIsValid(systable_getnext(conscan)));
432 :
433 15226 : systable_endscan(conscan);
434 15226 : table_close(conDesc, AccessShareLock);
435 :
436 15226 : return found;
437 : }
438 :
439 : /*
440 : * Does any constraint of the given name exist in the given namespace?
441 : *
442 : * This is used for code that wants to match ChooseConstraintName's rule
443 : * that we should avoid autogenerating duplicate constraint names within a
444 : * namespace.
445 : */
446 : bool
447 8336 : ConstraintNameExists(const char *conname, Oid namespaceid)
448 : {
449 : bool found;
450 : Relation conDesc;
451 : SysScanDesc conscan;
452 : ScanKeyData skey[2];
453 :
454 8336 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
455 :
456 8336 : ScanKeyInit(&skey[0],
457 : Anum_pg_constraint_conname,
458 : BTEqualStrategyNumber, F_NAMEEQ,
459 : CStringGetDatum(conname));
460 :
461 8336 : ScanKeyInit(&skey[1],
462 : Anum_pg_constraint_connamespace,
463 : BTEqualStrategyNumber, F_OIDEQ,
464 : ObjectIdGetDatum(namespaceid));
465 :
466 8336 : conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
467 : NULL, 2, skey);
468 :
469 8336 : found = (HeapTupleIsValid(systable_getnext(conscan)));
470 :
471 8336 : systable_endscan(conscan);
472 8336 : table_close(conDesc, AccessShareLock);
473 :
474 8336 : return found;
475 : }
476 :
477 : /*
478 : * Select a nonconflicting name for a new constraint.
479 : *
480 : * The objective here is to choose a name that is unique within the
481 : * specified namespace. Postgres does not require this, but the SQL
482 : * spec does, and some apps depend on it. Therefore we avoid choosing
483 : * default names that so conflict.
484 : *
485 : * name1, name2, and label are used the same way as for makeObjectName(),
486 : * except that the label can't be NULL; digits will be appended to the label
487 : * if needed to create a name that is unique within the specified namespace.
488 : *
489 : * 'others' can be a list of string names already chosen within the current
490 : * command (but not yet reflected into the catalogs); we will not choose
491 : * a duplicate of one of these either.
492 : *
493 : * Note: it is theoretically possible to get a collision anyway, if someone
494 : * else chooses the same name concurrently. This is fairly unlikely to be
495 : * a problem in practice, especially if one is holding an exclusive lock on
496 : * the relation identified by name1.
497 : *
498 : * Returns a palloc'd string.
499 : */
500 : char *
501 10120 : ChooseConstraintName(const char *name1, const char *name2,
502 : const char *label, Oid namespaceid,
503 : List *others)
504 : {
505 10120 : int pass = 0;
506 10120 : char *conname = NULL;
507 : char modlabel[NAMEDATALEN];
508 : Relation conDesc;
509 : SysScanDesc conscan;
510 : ScanKeyData skey[2];
511 : bool found;
512 : ListCell *l;
513 :
514 10120 : conDesc = table_open(ConstraintRelationId, AccessShareLock);
515 :
516 : /* try the unmodified label first */
517 10120 : strlcpy(modlabel, label, sizeof(modlabel));
518 :
519 : for (;;)
520 : {
521 11742 : conname = makeObjectName(name1, name2, modlabel);
522 :
523 11742 : found = false;
524 :
525 14242 : foreach(l, others)
526 : {
527 2518 : if (strcmp((char *) lfirst(l), conname) == 0)
528 : {
529 18 : found = true;
530 18 : break;
531 : }
532 : }
533 :
534 11742 : if (!found)
535 : {
536 11724 : ScanKeyInit(&skey[0],
537 : Anum_pg_constraint_conname,
538 : BTEqualStrategyNumber, F_NAMEEQ,
539 : CStringGetDatum(conname));
540 :
541 11724 : ScanKeyInit(&skey[1],
542 : Anum_pg_constraint_connamespace,
543 : BTEqualStrategyNumber, F_OIDEQ,
544 : ObjectIdGetDatum(namespaceid));
545 :
546 11724 : conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
547 : NULL, 2, skey);
548 :
549 11724 : found = (HeapTupleIsValid(systable_getnext(conscan)));
550 :
551 11724 : systable_endscan(conscan);
552 : }
553 :
554 11742 : if (!found)
555 10120 : break;
556 :
557 : /* found a conflict, so try a new name component */
558 1622 : pfree(conname);
559 1622 : snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
560 : }
561 :
562 10120 : table_close(conDesc, AccessShareLock);
563 :
564 10120 : return conname;
565 : }
566 :
567 : /*
568 : * Find and return a copy of the pg_constraint tuple that implements a
569 : * validated not-null constraint for the given column of the given relation.
570 : *
571 : * XXX This would be easier if we had pg_attribute.notnullconstr with the OID
572 : * of the constraint that implements the not-null constraint for that column.
573 : * I'm not sure it's worth the catalog bloat and de-normalization, however.
574 : */
575 : HeapTuple
576 2108 : findNotNullConstraintAttnum(Oid relid, AttrNumber attnum)
577 : {
578 : Relation pg_constraint;
579 : HeapTuple conTup,
580 2108 : retval = NULL;
581 : SysScanDesc scan;
582 : ScanKeyData key;
583 :
584 2108 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
585 2108 : ScanKeyInit(&key,
586 : Anum_pg_constraint_conrelid,
587 : BTEqualStrategyNumber, F_OIDEQ,
588 : ObjectIdGetDatum(relid));
589 2108 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
590 : true, NULL, 1, &key);
591 :
592 3800 : while (HeapTupleIsValid(conTup = systable_getnext(scan)))
593 : {
594 2124 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(conTup);
595 : AttrNumber conkey;
596 :
597 : /*
598 : * We're looking for a NOTNULL constraint that's marked validated,
599 : * with the column we're looking for as the sole element in conkey.
600 : */
601 2124 : if (con->contype != CONSTRAINT_NOTNULL)
602 802 : continue;
603 1322 : if (!con->convalidated)
604 0 : continue;
605 :
606 1322 : conkey = extractNotNullColumn(conTup);
607 1322 : if (conkey != attnum)
608 890 : continue;
609 :
610 : /* Found it */
611 432 : retval = heap_copytuple(conTup);
612 432 : break;
613 : }
614 :
615 2108 : systable_endscan(scan);
616 2108 : table_close(pg_constraint, AccessShareLock);
617 :
618 2108 : return retval;
619 : }
620 :
621 : /*
622 : * Find and return the pg_constraint tuple that implements a validated
623 : * not-null constraint for the given column of the given relation.
624 : */
625 : HeapTuple
626 290 : findNotNullConstraint(Oid relid, const char *colname)
627 : {
628 290 : AttrNumber attnum = get_attnum(relid, colname);
629 :
630 290 : return findNotNullConstraintAttnum(relid, attnum);
631 : }
632 :
633 : /*
634 : * Find and return the pg_constraint tuple that implements a validated
635 : * not-null constraint for the given domain.
636 : */
637 : HeapTuple
638 12 : findDomainNotNullConstraint(Oid typid)
639 : {
640 : Relation pg_constraint;
641 : HeapTuple conTup,
642 12 : retval = NULL;
643 : SysScanDesc scan;
644 : ScanKeyData key;
645 :
646 12 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
647 12 : ScanKeyInit(&key,
648 : Anum_pg_constraint_contypid,
649 : BTEqualStrategyNumber, F_OIDEQ,
650 : ObjectIdGetDatum(typid));
651 12 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
652 : true, NULL, 1, &key);
653 :
654 12 : while (HeapTupleIsValid(conTup = systable_getnext(scan)))
655 : {
656 12 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(conTup);
657 :
658 : /*
659 : * We're looking for a NOTNULL constraint that's marked validated.
660 : */
661 12 : if (con->contype != CONSTRAINT_NOTNULL)
662 0 : continue;
663 12 : if (!con->convalidated)
664 0 : continue;
665 :
666 : /* Found it */
667 12 : retval = heap_copytuple(conTup);
668 12 : break;
669 : }
670 :
671 12 : systable_endscan(scan);
672 12 : table_close(pg_constraint, AccessShareLock);
673 :
674 12 : return retval;
675 : }
676 :
677 : /*
678 : * Given a pg_constraint tuple for a not-null constraint, return the column
679 : * number it is for.
680 : */
681 : AttrNumber
682 4770 : extractNotNullColumn(HeapTuple constrTup)
683 : {
684 : AttrNumber colnum;
685 : Datum adatum;
686 : ArrayType *arr;
687 :
688 : /* only tuples for not-null constraints should be given */
689 : Assert(((Form_pg_constraint) GETSTRUCT(constrTup))->contype == CONSTRAINT_NOTNULL);
690 :
691 4770 : adatum = SysCacheGetAttrNotNull(CONSTROID, constrTup,
692 : Anum_pg_constraint_conkey);
693 4770 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
694 4770 : if (ARR_NDIM(arr) != 1 ||
695 4770 : ARR_HASNULL(arr) ||
696 4770 : ARR_ELEMTYPE(arr) != INT2OID ||
697 4770 : ARR_DIMS(arr)[0] != 1)
698 0 : elog(ERROR, "conkey is not a 1-D smallint array");
699 :
700 4770 : memcpy(&colnum, ARR_DATA_PTR(arr), sizeof(AttrNumber));
701 :
702 4770 : if ((Pointer) arr != DatumGetPointer(adatum))
703 4770 : pfree(arr); /* free de-toasted copy, if any */
704 :
705 4770 : return colnum;
706 : }
707 :
708 : /*
709 : * AdjustNotNullInheritance1
710 : * Adjust inheritance count for a single not-null constraint
711 : *
712 : * If no not-null constraint is found for the column, return 0.
713 : * Caller can create one.
714 : * If the constraint does exist and it's inheritable, adjust its
715 : * inheritance count (and possibly islocal status) and return 1.
716 : * No further action needs to be taken.
717 : * If the constraint exists but is marked NO INHERIT, adjust it as above
718 : * and reset connoinherit to false, and return -1. Caller is
719 : * responsible for adding the same constraint to the children, if any.
720 : */
721 : int
722 1170 : AdjustNotNullInheritance1(Oid relid, AttrNumber attnum, int count,
723 : bool is_no_inherit)
724 : {
725 : HeapTuple tup;
726 :
727 : Assert(count >= 0);
728 :
729 1170 : tup = findNotNullConstraintAttnum(relid, attnum);
730 1170 : if (HeapTupleIsValid(tup))
731 : {
732 : Relation pg_constraint;
733 : Form_pg_constraint conform;
734 86 : int retval = 1;
735 :
736 86 : pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
737 86 : conform = (Form_pg_constraint) GETSTRUCT(tup);
738 :
739 : /*
740 : * If we're asked for a NO INHERIT constraint and this relation
741 : * already has an inheritable one, throw an error.
742 : */
743 86 : if (is_no_inherit && !conform->connoinherit)
744 6 : ereport(ERROR,
745 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
746 : errmsg("cannot change NO INHERIT status of inherited NOT NULL constraint \"%s\" on relation \"%s\"",
747 : NameStr(conform->conname), get_rel_name(relid)));
748 :
749 : /*
750 : * If the constraint already exists in this relation but it's marked
751 : * NO INHERIT, we can just remove that flag, and instruct caller to
752 : * recurse to add the constraint to children.
753 : */
754 80 : if (!is_no_inherit && conform->connoinherit)
755 : {
756 12 : conform->connoinherit = false;
757 12 : retval = -1; /* caller must add constraint on child rels */
758 : }
759 :
760 80 : if (count > 0)
761 74 : conform->coninhcount += count;
762 :
763 : /* sanity check */
764 80 : if (conform->coninhcount < 0)
765 0 : elog(ERROR, "invalid inhcount %d for constraint \"%s\" on relation \"%s\"",
766 : conform->coninhcount, NameStr(conform->conname),
767 : get_rel_name(relid));
768 :
769 : /*
770 : * If the constraint is no longer inherited, mark it local. It's
771 : * arguable that we should drop it instead, but it's hard to see that
772 : * being better. The user can drop it manually later.
773 : */
774 80 : if (conform->coninhcount == 0)
775 6 : conform->conislocal = true;
776 :
777 80 : CatalogTupleUpdate(pg_constraint, &tup->t_self, tup);
778 :
779 80 : table_close(pg_constraint, RowExclusiveLock);
780 :
781 80 : return retval;
782 : }
783 :
784 1084 : return 0;
785 : }
786 :
787 : /*
788 : * AdjustNotNullInheritance
789 : * Adjust not-null constraints' inhcount/islocal for
790 : * ALTER TABLE [NO] INHERITS
791 : *
792 : * Mark the NOT NULL constraints for the given relation columns as
793 : * inherited, so that they can't be dropped.
794 : *
795 : * Caller must have checked beforehand that attnotnull was set for all
796 : * columns. However, some of those could be set because of a primary
797 : * key, so throw a proper user-visible error if one is not found.
798 : */
799 : void
800 36 : AdjustNotNullInheritance(Oid relid, Bitmapset *columns, int count)
801 : {
802 : Relation pg_constraint;
803 : int attnum;
804 :
805 36 : pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
806 :
807 : /*
808 : * Scan the set of columns and bump inhcount for each.
809 : */
810 36 : attnum = -1;
811 66 : while ((attnum = bms_next_member(columns, attnum)) >= 0)
812 : {
813 : HeapTuple tup;
814 : Form_pg_constraint conform;
815 :
816 36 : tup = findNotNullConstraintAttnum(relid, attnum);
817 36 : if (!HeapTupleIsValid(tup))
818 6 : ereport(ERROR,
819 : errcode(ERRCODE_DATATYPE_MISMATCH),
820 : errmsg("column \"%s\" in child table must be marked NOT NULL",
821 : get_attname(relid, attnum,
822 : false)));
823 :
824 30 : conform = (Form_pg_constraint) GETSTRUCT(tup);
825 30 : conform->coninhcount += count;
826 30 : if (conform->coninhcount < 0)
827 0 : elog(ERROR, "invalid inhcount %d for constraint \"%s\" on relation \"%s\"",
828 : conform->coninhcount, NameStr(conform->conname),
829 : get_rel_name(relid));
830 :
831 : /*
832 : * If the constraints are no longer inherited, mark them local. It's
833 : * arguable that we should drop them instead, but it's hard to see
834 : * that being better. The user can drop it manually later.
835 : */
836 30 : if (conform->coninhcount == 0)
837 0 : conform->conislocal = true;
838 :
839 30 : CatalogTupleUpdate(pg_constraint, &tup->t_self, tup);
840 : }
841 :
842 30 : table_close(pg_constraint, RowExclusiveLock);
843 30 : }
844 :
845 : /*
846 : * RelationGetNotNullConstraints
847 : * Return the list of not-null constraints for the given rel
848 : *
849 : * Caller can request cooked constraints, or raw.
850 : *
851 : * This is seldom needed, so we just scan pg_constraint each time.
852 : *
853 : * XXX This is only used to create derived tables, so NO INHERIT constraints
854 : * are always skipped.
855 : */
856 : List *
857 12104 : RelationGetNotNullConstraints(Oid relid, bool cooked)
858 : {
859 12104 : List *notnulls = NIL;
860 : Relation constrRel;
861 : HeapTuple htup;
862 : SysScanDesc conscan;
863 : ScanKeyData skey;
864 :
865 12104 : constrRel = table_open(ConstraintRelationId, AccessShareLock);
866 12104 : ScanKeyInit(&skey,
867 : Anum_pg_constraint_conrelid,
868 : BTEqualStrategyNumber, F_OIDEQ,
869 : ObjectIdGetDatum(relid));
870 12104 : conscan = systable_beginscan(constrRel, ConstraintRelidTypidNameIndexId, true,
871 : NULL, 1, &skey);
872 :
873 16566 : while (HeapTupleIsValid(htup = systable_getnext(conscan)))
874 : {
875 4462 : Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(htup);
876 : AttrNumber colnum;
877 :
878 4462 : if (conForm->contype != CONSTRAINT_NOTNULL)
879 3172 : continue;
880 1290 : if (conForm->connoinherit)
881 48 : continue;
882 :
883 1242 : colnum = extractNotNullColumn(htup);
884 :
885 1242 : if (cooked)
886 : {
887 : CookedConstraint *cooked;
888 :
889 1040 : cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
890 :
891 1040 : cooked->contype = CONSTR_NOTNULL;
892 1040 : cooked->name = pstrdup(NameStr(conForm->conname));
893 1040 : cooked->attnum = colnum;
894 1040 : cooked->expr = NULL;
895 1040 : cooked->skip_validation = false;
896 1040 : cooked->is_local = true;
897 1040 : cooked->inhcount = 0;
898 1040 : cooked->is_no_inherit = conForm->connoinherit;
899 :
900 1040 : notnulls = lappend(notnulls, cooked);
901 : }
902 : else
903 : {
904 : Constraint *constr;
905 :
906 202 : constr = makeNode(Constraint);
907 202 : constr->contype = CONSTR_NOTNULL;
908 202 : constr->conname = pstrdup(NameStr(conForm->conname));
909 202 : constr->deferrable = false;
910 202 : constr->initdeferred = false;
911 202 : constr->location = -1;
912 202 : constr->keys = list_make1(makeString(get_attname(relid, colnum,
913 : false)));
914 202 : constr->skip_validation = false;
915 202 : constr->initially_valid = true;
916 202 : notnulls = lappend(notnulls, constr);
917 : }
918 : }
919 :
920 12104 : systable_endscan(conscan);
921 12104 : table_close(constrRel, AccessShareLock);
922 :
923 12104 : return notnulls;
924 : }
925 :
926 :
927 : /*
928 : * Delete a single constraint record.
929 : */
930 : void
931 19902 : RemoveConstraintById(Oid conId)
932 : {
933 : Relation conDesc;
934 : HeapTuple tup;
935 : Form_pg_constraint con;
936 :
937 19902 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
938 :
939 19902 : tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
940 19902 : if (!HeapTupleIsValid(tup)) /* should not happen */
941 0 : elog(ERROR, "cache lookup failed for constraint %u", conId);
942 19902 : con = (Form_pg_constraint) GETSTRUCT(tup);
943 :
944 : /*
945 : * Special processing depending on what the constraint is for.
946 : */
947 19902 : if (OidIsValid(con->conrelid))
948 : {
949 : Relation rel;
950 :
951 : /*
952 : * If the constraint is for a relation, open and exclusive-lock the
953 : * relation it's for.
954 : */
955 19568 : rel = table_open(con->conrelid, AccessExclusiveLock);
956 :
957 : /*
958 : * We need to update the relchecks count if it is a check constraint
959 : * being dropped. This update will force backends to rebuild relcache
960 : * entries when we commit.
961 : */
962 19566 : if (con->contype == CONSTRAINT_CHECK)
963 : {
964 : Relation pgrel;
965 : HeapTuple relTup;
966 : Form_pg_class classForm;
967 :
968 1696 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
969 1696 : relTup = SearchSysCacheCopy1(RELOID,
970 : ObjectIdGetDatum(con->conrelid));
971 1696 : if (!HeapTupleIsValid(relTup))
972 0 : elog(ERROR, "cache lookup failed for relation %u",
973 : con->conrelid);
974 1696 : classForm = (Form_pg_class) GETSTRUCT(relTup);
975 :
976 1696 : if (classForm->relchecks == 0) /* should not happen */
977 0 : elog(ERROR, "relation \"%s\" has relchecks = 0",
978 : RelationGetRelationName(rel));
979 1696 : classForm->relchecks--;
980 :
981 1696 : CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
982 :
983 1696 : heap_freetuple(relTup);
984 :
985 1696 : table_close(pgrel, RowExclusiveLock);
986 : }
987 :
988 : /* Keep lock on constraint's rel until end of xact */
989 19566 : table_close(rel, NoLock);
990 : }
991 334 : else if (OidIsValid(con->contypid))
992 : {
993 : /*
994 : * XXX for now, do nothing special when dropping a domain constraint
995 : *
996 : * Probably there should be some form of locking on the domain type,
997 : * but we have no such concept at the moment.
998 : */
999 : }
1000 : else
1001 0 : elog(ERROR, "constraint %u is not of a known type", conId);
1002 :
1003 : /* Fry the constraint itself */
1004 19900 : CatalogTupleDelete(conDesc, &tup->t_self);
1005 :
1006 : /* Clean up */
1007 19900 : ReleaseSysCache(tup);
1008 19900 : table_close(conDesc, RowExclusiveLock);
1009 19900 : }
1010 :
1011 : /*
1012 : * RenameConstraintById
1013 : * Rename a constraint.
1014 : *
1015 : * Note: this isn't intended to be a user-exposed function; it doesn't check
1016 : * permissions etc. Currently this is only invoked when renaming an index
1017 : * that is associated with a constraint, but it's made a little more general
1018 : * than that with the expectation of someday having ALTER TABLE RENAME
1019 : * CONSTRAINT.
1020 : */
1021 : void
1022 90 : RenameConstraintById(Oid conId, const char *newname)
1023 : {
1024 : Relation conDesc;
1025 : HeapTuple tuple;
1026 : Form_pg_constraint con;
1027 :
1028 90 : conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
1029 :
1030 90 : tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
1031 90 : if (!HeapTupleIsValid(tuple))
1032 0 : elog(ERROR, "cache lookup failed for constraint %u", conId);
1033 90 : con = (Form_pg_constraint) GETSTRUCT(tuple);
1034 :
1035 : /*
1036 : * For user-friendliness, check whether the name is already in use.
1037 : */
1038 174 : if (OidIsValid(con->conrelid) &&
1039 84 : ConstraintNameIsUsed(CONSTRAINT_RELATION,
1040 : con->conrelid,
1041 : newname))
1042 0 : ereport(ERROR,
1043 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1044 : errmsg("constraint \"%s\" for relation \"%s\" already exists",
1045 : newname, get_rel_name(con->conrelid))));
1046 96 : if (OidIsValid(con->contypid) &&
1047 6 : ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
1048 : con->contypid,
1049 : newname))
1050 0 : ereport(ERROR,
1051 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1052 : errmsg("constraint \"%s\" for domain %s already exists",
1053 : newname, format_type_be(con->contypid))));
1054 :
1055 : /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
1056 90 : namestrcpy(&(con->conname), newname);
1057 :
1058 90 : CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
1059 :
1060 90 : InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
1061 :
1062 90 : heap_freetuple(tuple);
1063 90 : table_close(conDesc, RowExclusiveLock);
1064 90 : }
1065 :
1066 : /*
1067 : * AlterConstraintNamespaces
1068 : * Find any constraints belonging to the specified object,
1069 : * and move them to the specified new namespace.
1070 : *
1071 : * isType indicates whether the owning object is a type or a relation.
1072 : */
1073 : void
1074 102 : AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
1075 : Oid newNspId, bool isType, ObjectAddresses *objsMoved)
1076 : {
1077 : Relation conRel;
1078 : ScanKeyData key[2];
1079 : SysScanDesc scan;
1080 : HeapTuple tup;
1081 :
1082 102 : conRel = table_open(ConstraintRelationId, RowExclusiveLock);
1083 :
1084 102 : ScanKeyInit(&key[0],
1085 : Anum_pg_constraint_conrelid,
1086 : BTEqualStrategyNumber, F_OIDEQ,
1087 : ObjectIdGetDatum(isType ? InvalidOid : ownerId));
1088 102 : ScanKeyInit(&key[1],
1089 : Anum_pg_constraint_contypid,
1090 : BTEqualStrategyNumber, F_OIDEQ,
1091 : ObjectIdGetDatum(isType ? ownerId : InvalidOid));
1092 :
1093 102 : scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
1094 : NULL, 2, key);
1095 :
1096 236 : while (HeapTupleIsValid((tup = systable_getnext(scan))))
1097 : {
1098 134 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
1099 : ObjectAddress thisobj;
1100 :
1101 134 : ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
1102 :
1103 134 : if (object_address_present(&thisobj, objsMoved))
1104 0 : continue;
1105 :
1106 : /* Don't update if the object is already part of the namespace */
1107 134 : if (conform->connamespace == oldNspId && oldNspId != newNspId)
1108 : {
1109 104 : tup = heap_copytuple(tup);
1110 104 : conform = (Form_pg_constraint) GETSTRUCT(tup);
1111 :
1112 104 : conform->connamespace = newNspId;
1113 :
1114 104 : CatalogTupleUpdate(conRel, &tup->t_self, tup);
1115 :
1116 : /*
1117 : * Note: currently, the constraint will not have its own
1118 : * dependency on the namespace, so we don't need to do
1119 : * changeDependencyFor().
1120 : */
1121 : }
1122 :
1123 134 : InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
1124 :
1125 134 : add_exact_object_address(&thisobj, objsMoved);
1126 : }
1127 :
1128 102 : systable_endscan(scan);
1129 :
1130 102 : table_close(conRel, RowExclusiveLock);
1131 102 : }
1132 :
1133 : /*
1134 : * ConstraintSetParentConstraint
1135 : * Set a partition's constraint as child of its parent constraint,
1136 : * or remove the linkage if parentConstrId is InvalidOid.
1137 : *
1138 : * This updates the constraint's pg_constraint row to show it as inherited, and
1139 : * adds PARTITION dependencies to prevent the constraint from being deleted
1140 : * on its own. Alternatively, reverse that.
1141 : */
1142 : void
1143 540 : ConstraintSetParentConstraint(Oid childConstrId,
1144 : Oid parentConstrId,
1145 : Oid childTableId)
1146 : {
1147 : Relation constrRel;
1148 : Form_pg_constraint constrForm;
1149 : HeapTuple tuple,
1150 : newtup;
1151 : ObjectAddress depender;
1152 : ObjectAddress referenced;
1153 :
1154 540 : constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
1155 540 : tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
1156 540 : if (!HeapTupleIsValid(tuple))
1157 0 : elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
1158 540 : newtup = heap_copytuple(tuple);
1159 540 : constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
1160 540 : if (OidIsValid(parentConstrId))
1161 : {
1162 : /* don't allow setting parent for a constraint that already has one */
1163 : Assert(constrForm->coninhcount == 0);
1164 310 : if (constrForm->conparentid != InvalidOid)
1165 0 : elog(ERROR, "constraint %u already has a parent constraint",
1166 : childConstrId);
1167 :
1168 310 : constrForm->conislocal = false;
1169 310 : constrForm->coninhcount++;
1170 310 : if (constrForm->coninhcount < 0)
1171 0 : ereport(ERROR,
1172 : errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1173 : errmsg("too many inheritance parents"));
1174 310 : constrForm->conparentid = parentConstrId;
1175 :
1176 310 : CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1177 :
1178 310 : ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
1179 :
1180 310 : ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
1181 310 : recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
1182 :
1183 310 : ObjectAddressSet(referenced, RelationRelationId, childTableId);
1184 310 : recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
1185 : }
1186 : else
1187 : {
1188 230 : constrForm->coninhcount--;
1189 230 : constrForm->conislocal = true;
1190 230 : constrForm->conparentid = InvalidOid;
1191 :
1192 : /* Make sure there's no further inheritance. */
1193 : Assert(constrForm->coninhcount == 0);
1194 :
1195 230 : CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1196 :
1197 230 : deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
1198 : ConstraintRelationId,
1199 : DEPENDENCY_PARTITION_PRI);
1200 230 : deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
1201 : RelationRelationId,
1202 : DEPENDENCY_PARTITION_SEC);
1203 : }
1204 :
1205 540 : ReleaseSysCache(tuple);
1206 540 : table_close(constrRel, RowExclusiveLock);
1207 540 : }
1208 :
1209 :
1210 : /*
1211 : * get_relation_constraint_oid
1212 : * Find a constraint on the specified relation with the specified name.
1213 : * Returns constraint's OID.
1214 : */
1215 : Oid
1216 338 : get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
1217 : {
1218 : Relation pg_constraint;
1219 : HeapTuple tuple;
1220 : SysScanDesc scan;
1221 : ScanKeyData skey[3];
1222 338 : Oid conOid = InvalidOid;
1223 :
1224 338 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1225 :
1226 338 : ScanKeyInit(&skey[0],
1227 : Anum_pg_constraint_conrelid,
1228 : BTEqualStrategyNumber, F_OIDEQ,
1229 : ObjectIdGetDatum(relid));
1230 338 : ScanKeyInit(&skey[1],
1231 : Anum_pg_constraint_contypid,
1232 : BTEqualStrategyNumber, F_OIDEQ,
1233 : ObjectIdGetDatum(InvalidOid));
1234 338 : ScanKeyInit(&skey[2],
1235 : Anum_pg_constraint_conname,
1236 : BTEqualStrategyNumber, F_NAMEEQ,
1237 : CStringGetDatum(conname));
1238 :
1239 338 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1240 : NULL, 3, skey);
1241 :
1242 : /* There can be at most one matching row */
1243 338 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1244 326 : conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1245 :
1246 338 : systable_endscan(scan);
1247 :
1248 : /* If no such constraint exists, complain */
1249 338 : if (!OidIsValid(conOid) && !missing_ok)
1250 12 : ereport(ERROR,
1251 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1252 : errmsg("constraint \"%s\" for table \"%s\" does not exist",
1253 : conname, get_rel_name(relid))));
1254 :
1255 326 : table_close(pg_constraint, AccessShareLock);
1256 :
1257 326 : return conOid;
1258 : }
1259 :
1260 : /*
1261 : * get_relation_constraint_attnos
1262 : * Find a constraint on the specified relation with the specified name
1263 : * and return the constrained columns.
1264 : *
1265 : * Returns a Bitmapset of the column attnos of the constrained columns, with
1266 : * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
1267 : * columns can be represented.
1268 : *
1269 : * *constraintOid is set to the OID of the constraint, or InvalidOid on
1270 : * failure.
1271 : */
1272 : Bitmapset *
1273 48 : get_relation_constraint_attnos(Oid relid, const char *conname,
1274 : bool missing_ok, Oid *constraintOid)
1275 : {
1276 48 : Bitmapset *conattnos = NULL;
1277 : Relation pg_constraint;
1278 : HeapTuple tuple;
1279 : SysScanDesc scan;
1280 : ScanKeyData skey[3];
1281 :
1282 : /* Set *constraintOid, to avoid complaints about uninitialized vars */
1283 48 : *constraintOid = InvalidOid;
1284 :
1285 48 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1286 :
1287 48 : ScanKeyInit(&skey[0],
1288 : Anum_pg_constraint_conrelid,
1289 : BTEqualStrategyNumber, F_OIDEQ,
1290 : ObjectIdGetDatum(relid));
1291 48 : ScanKeyInit(&skey[1],
1292 : Anum_pg_constraint_contypid,
1293 : BTEqualStrategyNumber, F_OIDEQ,
1294 : ObjectIdGetDatum(InvalidOid));
1295 48 : ScanKeyInit(&skey[2],
1296 : Anum_pg_constraint_conname,
1297 : BTEqualStrategyNumber, F_NAMEEQ,
1298 : CStringGetDatum(conname));
1299 :
1300 48 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1301 : NULL, 3, skey);
1302 :
1303 : /* There can be at most one matching row */
1304 48 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1305 : {
1306 : Datum adatum;
1307 : bool isNull;
1308 :
1309 48 : *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1310 :
1311 : /* Extract the conkey array, ie, attnums of constrained columns */
1312 48 : adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1313 : RelationGetDescr(pg_constraint), &isNull);
1314 48 : if (!isNull)
1315 : {
1316 : ArrayType *arr;
1317 : int numcols;
1318 : int16 *attnums;
1319 : int i;
1320 :
1321 48 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1322 48 : numcols = ARR_DIMS(arr)[0];
1323 48 : if (ARR_NDIM(arr) != 1 ||
1324 48 : numcols < 0 ||
1325 48 : ARR_HASNULL(arr) ||
1326 48 : ARR_ELEMTYPE(arr) != INT2OID)
1327 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1328 48 : attnums = (int16 *) ARR_DATA_PTR(arr);
1329 :
1330 : /* Construct the result value */
1331 108 : for (i = 0; i < numcols; i++)
1332 : {
1333 60 : conattnos = bms_add_member(conattnos,
1334 60 : attnums[i] - FirstLowInvalidHeapAttributeNumber);
1335 : }
1336 : }
1337 : }
1338 :
1339 48 : systable_endscan(scan);
1340 :
1341 : /* If no such constraint exists, complain */
1342 48 : if (!OidIsValid(*constraintOid) && !missing_ok)
1343 0 : ereport(ERROR,
1344 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1345 : errmsg("constraint \"%s\" for table \"%s\" does not exist",
1346 : conname, get_rel_name(relid))));
1347 :
1348 48 : table_close(pg_constraint, AccessShareLock);
1349 :
1350 48 : return conattnos;
1351 : }
1352 :
1353 : /*
1354 : * Return the OID of the constraint enforced by the given index in the
1355 : * given relation; or InvalidOid if no such index is cataloged.
1356 : *
1357 : * Much like get_constraint_index, this function is concerned only with the
1358 : * one constraint that "owns" the given index. Therefore, constraints of
1359 : * types other than unique, primary-key, and exclusion are ignored.
1360 : */
1361 : Oid
1362 1490 : get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
1363 : {
1364 : Relation pg_constraint;
1365 : SysScanDesc scan;
1366 : ScanKeyData key;
1367 : HeapTuple tuple;
1368 1490 : Oid constraintId = InvalidOid;
1369 :
1370 1490 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1371 :
1372 1490 : ScanKeyInit(&key,
1373 : Anum_pg_constraint_conrelid,
1374 : BTEqualStrategyNumber,
1375 : F_OIDEQ,
1376 : ObjectIdGetDatum(relationId));
1377 1490 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
1378 : true, NULL, 1, &key);
1379 1802 : while ((tuple = systable_getnext(scan)) != NULL)
1380 : {
1381 : Form_pg_constraint constrForm;
1382 :
1383 1134 : constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1384 :
1385 : /* See above */
1386 1134 : if (constrForm->contype != CONSTRAINT_PRIMARY &&
1387 396 : constrForm->contype != CONSTRAINT_UNIQUE &&
1388 306 : constrForm->contype != CONSTRAINT_EXCLUSION)
1389 306 : continue;
1390 :
1391 828 : if (constrForm->conindid == indexId)
1392 : {
1393 822 : constraintId = constrForm->oid;
1394 822 : break;
1395 : }
1396 : }
1397 1490 : systable_endscan(scan);
1398 :
1399 1490 : table_close(pg_constraint, AccessShareLock);
1400 1490 : return constraintId;
1401 : }
1402 :
1403 : /*
1404 : * get_domain_constraint_oid
1405 : * Find a constraint on the specified domain with the specified name.
1406 : * Returns constraint's OID.
1407 : */
1408 : Oid
1409 62 : get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1410 : {
1411 : Relation pg_constraint;
1412 : HeapTuple tuple;
1413 : SysScanDesc scan;
1414 : ScanKeyData skey[3];
1415 62 : Oid conOid = InvalidOid;
1416 :
1417 62 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1418 :
1419 62 : ScanKeyInit(&skey[0],
1420 : Anum_pg_constraint_conrelid,
1421 : BTEqualStrategyNumber, F_OIDEQ,
1422 : ObjectIdGetDatum(InvalidOid));
1423 62 : ScanKeyInit(&skey[1],
1424 : Anum_pg_constraint_contypid,
1425 : BTEqualStrategyNumber, F_OIDEQ,
1426 : ObjectIdGetDatum(typid));
1427 62 : ScanKeyInit(&skey[2],
1428 : Anum_pg_constraint_conname,
1429 : BTEqualStrategyNumber, F_NAMEEQ,
1430 : CStringGetDatum(conname));
1431 :
1432 62 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1433 : NULL, 3, skey);
1434 :
1435 : /* There can be at most one matching row */
1436 62 : if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1437 56 : conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1438 :
1439 62 : systable_endscan(scan);
1440 :
1441 : /* If no such constraint exists, complain */
1442 62 : if (!OidIsValid(conOid) && !missing_ok)
1443 6 : ereport(ERROR,
1444 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1445 : errmsg("constraint \"%s\" for domain %s does not exist",
1446 : conname, format_type_be(typid))));
1447 :
1448 56 : table_close(pg_constraint, AccessShareLock);
1449 :
1450 56 : return conOid;
1451 : }
1452 :
1453 : /*
1454 : * get_primary_key_attnos
1455 : * Identify the columns in a relation's primary key, if any.
1456 : *
1457 : * Returns a Bitmapset of the column attnos of the primary key's columns,
1458 : * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1459 : * system columns can be represented.
1460 : *
1461 : * If there is no primary key, return NULL. We also return NULL if the pkey
1462 : * constraint is deferrable and deferrableOk is false.
1463 : *
1464 : * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1465 : * on failure.
1466 : */
1467 : Bitmapset *
1468 966 : get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1469 : {
1470 966 : Bitmapset *pkattnos = NULL;
1471 : Relation pg_constraint;
1472 : HeapTuple tuple;
1473 : SysScanDesc scan;
1474 : ScanKeyData skey[1];
1475 :
1476 : /* Set *constraintOid, to avoid complaints about uninitialized vars */
1477 966 : *constraintOid = InvalidOid;
1478 :
1479 : /* Scan pg_constraint for constraints of the target rel */
1480 966 : pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1481 :
1482 966 : ScanKeyInit(&skey[0],
1483 : Anum_pg_constraint_conrelid,
1484 : BTEqualStrategyNumber, F_OIDEQ,
1485 : ObjectIdGetDatum(relid));
1486 :
1487 966 : scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1488 : NULL, 1, skey);
1489 :
1490 1128 : while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1491 : {
1492 514 : Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1493 : Datum adatum;
1494 : bool isNull;
1495 : ArrayType *arr;
1496 : int16 *attnums;
1497 : int numkeys;
1498 : int i;
1499 :
1500 : /* Skip constraints that are not PRIMARY KEYs */
1501 514 : if (con->contype != CONSTRAINT_PRIMARY)
1502 162 : continue;
1503 :
1504 : /*
1505 : * If the primary key is deferrable, but we've been instructed to
1506 : * ignore deferrable constraints, then we might as well give up
1507 : * searching, since there can only be a single primary key on a table.
1508 : */
1509 352 : if (con->condeferrable && !deferrableOk)
1510 352 : break;
1511 :
1512 : /* Extract the conkey array, ie, attnums of PK's columns */
1513 346 : adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1514 : RelationGetDescr(pg_constraint), &isNull);
1515 346 : if (isNull)
1516 0 : elog(ERROR, "null conkey for constraint %u",
1517 : ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1518 346 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1519 346 : numkeys = ARR_DIMS(arr)[0];
1520 346 : if (ARR_NDIM(arr) != 1 ||
1521 346 : numkeys < 0 ||
1522 346 : ARR_HASNULL(arr) ||
1523 346 : ARR_ELEMTYPE(arr) != INT2OID)
1524 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1525 346 : attnums = (int16 *) ARR_DATA_PTR(arr);
1526 :
1527 : /* Construct the result value */
1528 764 : for (i = 0; i < numkeys; i++)
1529 : {
1530 418 : pkattnos = bms_add_member(pkattnos,
1531 418 : attnums[i] - FirstLowInvalidHeapAttributeNumber);
1532 : }
1533 346 : *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1534 :
1535 : /* No need to search further */
1536 346 : break;
1537 : }
1538 :
1539 966 : systable_endscan(scan);
1540 :
1541 966 : table_close(pg_constraint, AccessShareLock);
1542 :
1543 966 : return pkattnos;
1544 : }
1545 :
1546 : /*
1547 : * Extract data from the pg_constraint tuple of a foreign-key constraint.
1548 : *
1549 : * All arguments save the first are output arguments. All output arguments
1550 : * other than numfks, conkey and confkey can be passed as NULL if caller
1551 : * doesn't need them.
1552 : */
1553 : void
1554 7412 : DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1555 : AttrNumber *conkey, AttrNumber *confkey,
1556 : Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs,
1557 : int *num_fk_del_set_cols, AttrNumber *fk_del_set_cols)
1558 : {
1559 : Datum adatum;
1560 : bool isNull;
1561 : ArrayType *arr;
1562 : int numkeys;
1563 :
1564 : /*
1565 : * We expect the arrays to be 1-D arrays of the right types; verify that.
1566 : * We don't need to use deconstruct_array() since the array data is just
1567 : * going to look like a C array of values.
1568 : */
1569 7412 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1570 : Anum_pg_constraint_conkey);
1571 7412 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1572 7412 : if (ARR_NDIM(arr) != 1 ||
1573 7412 : ARR_HASNULL(arr) ||
1574 7412 : ARR_ELEMTYPE(arr) != INT2OID)
1575 0 : elog(ERROR, "conkey is not a 1-D smallint array");
1576 7412 : numkeys = ARR_DIMS(arr)[0];
1577 7412 : if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1578 0 : elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1579 7412 : memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1580 7412 : if ((Pointer) arr != DatumGetPointer(adatum))
1581 7412 : pfree(arr); /* free de-toasted copy, if any */
1582 :
1583 7412 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1584 : Anum_pg_constraint_confkey);
1585 7412 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1586 7412 : if (ARR_NDIM(arr) != 1 ||
1587 7412 : ARR_DIMS(arr)[0] != numkeys ||
1588 7412 : ARR_HASNULL(arr) ||
1589 7412 : ARR_ELEMTYPE(arr) != INT2OID)
1590 0 : elog(ERROR, "confkey is not a 1-D smallint array");
1591 7412 : memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1592 7412 : if ((Pointer) arr != DatumGetPointer(adatum))
1593 7412 : pfree(arr); /* free de-toasted copy, if any */
1594 :
1595 7412 : if (pf_eq_oprs)
1596 : {
1597 7412 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1598 : Anum_pg_constraint_conpfeqop);
1599 7412 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1600 : /* see TryReuseForeignKey if you change the test below */
1601 7412 : if (ARR_NDIM(arr) != 1 ||
1602 7412 : ARR_DIMS(arr)[0] != numkeys ||
1603 7412 : ARR_HASNULL(arr) ||
1604 7412 : ARR_ELEMTYPE(arr) != OIDOID)
1605 0 : elog(ERROR, "conpfeqop is not a 1-D Oid array");
1606 7412 : memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1607 7412 : if ((Pointer) arr != DatumGetPointer(adatum))
1608 7412 : pfree(arr); /* free de-toasted copy, if any */
1609 : }
1610 :
1611 7412 : if (pp_eq_oprs)
1612 : {
1613 4706 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1614 : Anum_pg_constraint_conppeqop);
1615 4706 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1616 4706 : if (ARR_NDIM(arr) != 1 ||
1617 4706 : ARR_DIMS(arr)[0] != numkeys ||
1618 4706 : ARR_HASNULL(arr) ||
1619 4706 : ARR_ELEMTYPE(arr) != OIDOID)
1620 0 : elog(ERROR, "conppeqop is not a 1-D Oid array");
1621 4706 : memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1622 4706 : if ((Pointer) arr != DatumGetPointer(adatum))
1623 4706 : pfree(arr); /* free de-toasted copy, if any */
1624 : }
1625 :
1626 7412 : if (ff_eq_oprs)
1627 : {
1628 4706 : adatum = SysCacheGetAttrNotNull(CONSTROID, tuple,
1629 : Anum_pg_constraint_conffeqop);
1630 4706 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1631 4706 : if (ARR_NDIM(arr) != 1 ||
1632 4706 : ARR_DIMS(arr)[0] != numkeys ||
1633 4706 : ARR_HASNULL(arr) ||
1634 4706 : ARR_ELEMTYPE(arr) != OIDOID)
1635 0 : elog(ERROR, "conffeqop is not a 1-D Oid array");
1636 4706 : memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1637 4706 : if ((Pointer) arr != DatumGetPointer(adatum))
1638 4706 : pfree(arr); /* free de-toasted copy, if any */
1639 : }
1640 :
1641 7412 : if (fk_del_set_cols)
1642 : {
1643 4706 : adatum = SysCacheGetAttr(CONSTROID, tuple,
1644 : Anum_pg_constraint_confdelsetcols, &isNull);
1645 4706 : if (isNull)
1646 : {
1647 4628 : *num_fk_del_set_cols = 0;
1648 : }
1649 : else
1650 : {
1651 : int num_delete_cols;
1652 :
1653 78 : arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1654 78 : if (ARR_NDIM(arr) != 1 ||
1655 78 : ARR_HASNULL(arr) ||
1656 78 : ARR_ELEMTYPE(arr) != INT2OID)
1657 0 : elog(ERROR, "confdelsetcols is not a 1-D smallint array");
1658 78 : num_delete_cols = ARR_DIMS(arr)[0];
1659 78 : memcpy(fk_del_set_cols, ARR_DATA_PTR(arr), num_delete_cols * sizeof(int16));
1660 78 : if ((Pointer) arr != DatumGetPointer(adatum))
1661 78 : pfree(arr); /* free de-toasted copy, if any */
1662 :
1663 78 : *num_fk_del_set_cols = num_delete_cols;
1664 : }
1665 : }
1666 :
1667 7412 : *numfks = numkeys;
1668 7412 : }
1669 :
1670 : /*
1671 : * FindFKPeriodOpers -
1672 : *
1673 : * Looks up the operator oids used for the PERIOD part of a temporal foreign key.
1674 : * The opclass should be the opclass of that PERIOD element.
1675 : * Everything else is an output: containedbyoperoid is the ContainedBy operator for
1676 : * types matching the PERIOD element.
1677 : * aggedcontainedbyoperoid is also a ContainedBy operator,
1678 : * but one whose rhs is a multirange.
1679 : * That way foreign keys can compare fkattr <@ range_agg(pkattr).
1680 : */
1681 : void
1682 408 : FindFKPeriodOpers(Oid opclass,
1683 : Oid *containedbyoperoid,
1684 : Oid *aggedcontainedbyoperoid)
1685 : {
1686 408 : Oid opfamily = InvalidOid;
1687 408 : Oid opcintype = InvalidOid;
1688 : StrategyNumber strat;
1689 :
1690 : /* Make sure we have a range or multirange. */
1691 408 : if (get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1692 : {
1693 408 : if (opcintype != ANYRANGEOID && opcintype != ANYMULTIRANGEOID)
1694 6 : ereport(ERROR,
1695 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1696 : errmsg("invalid type for PERIOD part of foreign key"),
1697 : errdetail("Only range and multirange are supported."));
1698 :
1699 : }
1700 : else
1701 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
1702 :
1703 : /*
1704 : * Look up the ContainedBy operator whose lhs and rhs are the opclass's
1705 : * type. We use this to optimize RI checks: if the new value includes all
1706 : * of the old value, then we can treat the attribute as if it didn't
1707 : * change, and skip the RI check.
1708 : */
1709 402 : strat = RTContainedByStrategyNumber;
1710 402 : GetOperatorFromWellKnownStrategy(opclass,
1711 : InvalidOid,
1712 : containedbyoperoid,
1713 : &strat);
1714 :
1715 : /*
1716 : * Now look up the ContainedBy operator. Its left arg must be the type of
1717 : * the column (or rather of the opclass). Its right arg must match the
1718 : * return type of the support proc.
1719 : */
1720 402 : strat = RTContainedByStrategyNumber;
1721 402 : GetOperatorFromWellKnownStrategy(opclass,
1722 : ANYMULTIRANGEOID,
1723 : aggedcontainedbyoperoid,
1724 : &strat);
1725 402 : }
1726 :
1727 : /*
1728 : * Determine whether a relation can be proven functionally dependent on
1729 : * a set of grouping columns. If so, return true and add the pg_constraint
1730 : * OIDs of the constraints needed for the proof to the *constraintDeps list.
1731 : *
1732 : * grouping_columns is a list of grouping expressions, in which columns of
1733 : * the rel of interest are Vars with the indicated varno/varlevelsup.
1734 : *
1735 : * Currently we only check to see if the rel has a primary key that is a
1736 : * subset of the grouping_columns. We could also use plain unique constraints
1737 : * if all their columns are known not null, but there's a problem: we need
1738 : * to be able to represent the not-null-ness as part of the constraints added
1739 : * to *constraintDeps. FIXME whenever not-null constraints get represented
1740 : * in pg_constraint.
1741 : */
1742 : bool
1743 206 : check_functional_grouping(Oid relid,
1744 : Index varno, Index varlevelsup,
1745 : List *grouping_columns,
1746 : List **constraintDeps)
1747 : {
1748 : Bitmapset *pkattnos;
1749 : Bitmapset *groupbyattnos;
1750 : Oid constraintOid;
1751 : ListCell *gl;
1752 :
1753 : /* If the rel has no PK, then we can't prove functional dependency */
1754 206 : pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1755 206 : if (pkattnos == NULL)
1756 42 : return false;
1757 :
1758 : /* Identify all the rel's columns that appear in grouping_columns */
1759 164 : groupbyattnos = NULL;
1760 370 : foreach(gl, grouping_columns)
1761 : {
1762 206 : Var *gvar = (Var *) lfirst(gl);
1763 :
1764 206 : if (IsA(gvar, Var) &&
1765 206 : gvar->varno == varno &&
1766 164 : gvar->varlevelsup == varlevelsup)
1767 164 : groupbyattnos = bms_add_member(groupbyattnos,
1768 164 : gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1769 : }
1770 :
1771 164 : if (bms_is_subset(pkattnos, groupbyattnos))
1772 : {
1773 : /* The PK is a subset of grouping_columns, so we win */
1774 122 : *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1775 122 : return true;
1776 : }
1777 :
1778 42 : return false;
1779 : }
|