LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin_validate.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 73 94 77.7 %
Date: 2024-04-19 09:12:35 Functions: 1 1 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * brin_validate.c
       4             :  *    Opclass validator for BRIN.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/access/brin/brin_validate.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : #include "postgres.h"
      15             : 
      16             : #include "access/amvalidate.h"
      17             : #include "access/brin_internal.h"
      18             : #include "access/htup_details.h"
      19             : #include "catalog/pg_amop.h"
      20             : #include "catalog/pg_amproc.h"
      21             : #include "catalog/pg_opclass.h"
      22             : #include "catalog/pg_opfamily.h"
      23             : #include "catalog/pg_type.h"
      24             : #include "utils/builtins.h"
      25             : #include "utils/regproc.h"
      26             : #include "utils/syscache.h"
      27             : 
      28             : /*
      29             :  * Validator for a BRIN opclass.
      30             :  *
      31             :  * Some of the checks done here cover the whole opfamily, and therefore are
      32             :  * redundant when checking each opclass in a family.  But they don't run long
      33             :  * enough to be much of a problem, so we accept the duplication rather than
      34             :  * complicate the amvalidate API.
      35             :  */
      36             : bool
      37         432 : brinvalidate(Oid opclassoid)
      38             : {
      39         432 :     bool        result = true;
      40             :     HeapTuple   classtup;
      41             :     Form_pg_opclass classform;
      42             :     Oid         opfamilyoid;
      43             :     Oid         opcintype;
      44             :     char       *opclassname;
      45             :     HeapTuple   familytup;
      46             :     Form_pg_opfamily familyform;
      47             :     char       *opfamilyname;
      48             :     CatCList   *proclist,
      49             :                *oprlist;
      50         432 :     uint64      allfuncs = 0;
      51         432 :     uint64      allops = 0;
      52             :     List       *grouplist;
      53             :     OpFamilyOpFuncGroup *opclassgroup;
      54             :     int         i;
      55             :     ListCell   *lc;
      56             : 
      57             :     /* Fetch opclass information */
      58         432 :     classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
      59         432 :     if (!HeapTupleIsValid(classtup))
      60           0 :         elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
      61         432 :     classform = (Form_pg_opclass) GETSTRUCT(classtup);
      62             : 
      63         432 :     opfamilyoid = classform->opcfamily;
      64         432 :     opcintype = classform->opcintype;
      65         432 :     opclassname = NameStr(classform->opcname);
      66             : 
      67             :     /* Fetch opfamily information */
      68         432 :     familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
      69         432 :     if (!HeapTupleIsValid(familytup))
      70           0 :         elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
      71         432 :     familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
      72             : 
      73         432 :     opfamilyname = NameStr(familyform->opfname);
      74             : 
      75             :     /* Fetch all operators and support functions of the opfamily */
      76         432 :     oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
      77         432 :     proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
      78             : 
      79             :     /* Check individual support functions */
      80        4068 :     for (i = 0; i < proclist->n_members; i++)
      81             :     {
      82        3636 :         HeapTuple   proctup = &proclist->members[i]->tuple;
      83        3636 :         Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
      84             :         bool        ok;
      85             : 
      86             :         /* Check procedure numbers and function signatures */
      87        3636 :         switch (procform->amprocnum)
      88             :         {
      89         684 :             case BRIN_PROCNUM_OPCINFO:
      90         684 :                 ok = check_amproc_signature(procform->amproc, INTERNALOID, true,
      91             :                                             1, 1, INTERNALOID);
      92         684 :                 break;
      93         684 :             case BRIN_PROCNUM_ADDVALUE:
      94         684 :                 ok = check_amproc_signature(procform->amproc, BOOLOID, true,
      95             :                                             4, 4, INTERNALOID, INTERNALOID,
      96             :                                             INTERNALOID, INTERNALOID);
      97         684 :                 break;
      98         684 :             case BRIN_PROCNUM_CONSISTENT:
      99         684 :                 ok = check_amproc_signature(procform->amproc, BOOLOID, true,
     100             :                                             3, 4, INTERNALOID, INTERNALOID,
     101             :                                             INTERNALOID, INT4OID);
     102         684 :                 break;
     103         684 :             case BRIN_PROCNUM_UNION:
     104         684 :                 ok = check_amproc_signature(procform->amproc, BOOLOID, true,
     105             :                                             3, 3, INTERNALOID, INTERNALOID,
     106             :                                             INTERNALOID);
     107         684 :                 break;
     108         426 :             case BRIN_PROCNUM_OPTIONS:
     109         426 :                 ok = check_amoptsproc_signature(procform->amproc);
     110         426 :                 break;
     111         474 :             default:
     112             :                 /* Complain if it's not a valid optional proc number */
     113         474 :                 if (procform->amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
     114         474 :                     procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
     115             :                 {
     116           0 :                     ereport(INFO,
     117             :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     118             :                              errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d",
     119             :                                     opfamilyname, "brin",
     120             :                                     format_procedure(procform->amproc),
     121             :                                     procform->amprocnum)));
     122           0 :                     result = false;
     123           0 :                     continue;   /* omit bad proc numbers from allfuncs */
     124             :                 }
     125             :                 /* Can't check signatures of optional procs, so assume OK */
     126         474 :                 ok = true;
     127         474 :                 break;
     128             :         }
     129             : 
     130        3636 :         if (!ok)
     131             :         {
     132           0 :             ereport(INFO,
     133             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     134             :                      errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d",
     135             :                             opfamilyname, "brin",
     136             :                             format_procedure(procform->amproc),
     137             :                             procform->amprocnum)));
     138           0 :             result = false;
     139             :         }
     140             : 
     141             :         /* Track all valid procedure numbers seen in opfamily */
     142        3636 :         allfuncs |= ((uint64) 1) << procform->amprocnum;
     143             :     }
     144             : 
     145             :     /* Check individual operators */
     146        5448 :     for (i = 0; i < oprlist->n_members; i++)
     147             :     {
     148        5016 :         HeapTuple   oprtup = &oprlist->members[i]->tuple;
     149        5016 :         Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
     150             : 
     151             :         /* Check that only allowed strategy numbers exist */
     152        5016 :         if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
     153             :         {
     154           0 :             ereport(INFO,
     155             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     156             :                      errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d",
     157             :                             opfamilyname, "brin",
     158             :                             format_operator(oprform->amopopr),
     159             :                             oprform->amopstrategy)));
     160           0 :             result = false;
     161             :         }
     162             :         else
     163             :         {
     164             :             /*
     165             :              * The set of operators supplied varies across BRIN opfamilies.
     166             :              * Our plan is to identify all operator strategy numbers used in
     167             :              * the opfamily and then complain about datatype combinations that
     168             :              * are missing any operator(s).  However, consider only numbers
     169             :              * that appear in some non-cross-type case, since cross-type
     170             :              * operators may have unique strategies.  (This is not a great
     171             :              * heuristic, in particular an erroneous number used in a
     172             :              * cross-type operator will not get noticed; but the core BRIN
     173             :              * opfamilies are messy enough to make it necessary.)
     174             :              */
     175        5016 :             if (oprform->amoplefttype == oprform->amoprighttype)
     176        2604 :                 allops |= ((uint64) 1) << oprform->amopstrategy;
     177             :         }
     178             : 
     179             :         /* brin doesn't support ORDER BY operators */
     180        5016 :         if (oprform->amoppurpose != AMOP_SEARCH ||
     181        5016 :             OidIsValid(oprform->amopsortfamily))
     182             :         {
     183           0 :             ereport(INFO,
     184             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     185             :                      errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
     186             :                             opfamilyname, "brin",
     187             :                             format_operator(oprform->amopopr))));
     188           0 :             result = false;
     189             :         }
     190             : 
     191             :         /* Check operator signature --- same for all brin strategies */
     192        5016 :         if (!check_amop_signature(oprform->amopopr, BOOLOID,
     193             :                                   oprform->amoplefttype,
     194             :                                   oprform->amoprighttype))
     195             :         {
     196           0 :             ereport(INFO,
     197             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     198             :                      errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature",
     199             :                             opfamilyname, "brin",
     200             :                             format_operator(oprform->amopopr))));
     201           0 :             result = false;
     202             :         }
     203             :     }
     204             : 
     205             :     /* Now check for inconsistent groups of operators/functions */
     206         432 :     grouplist = identify_opfamily_groups(oprlist, proclist);
     207         432 :     opclassgroup = NULL;
     208        1608 :     foreach(lc, grouplist)
     209             :     {
     210        1176 :         OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
     211             : 
     212             :         /* Remember the group exactly matching the test opclass */
     213        1176 :         if (thisgroup->lefttype == opcintype &&
     214         612 :             thisgroup->righttype == opcintype)
     215         432 :             opclassgroup = thisgroup;
     216             : 
     217             :         /*
     218             :          * Some BRIN opfamilies expect cross-type support functions to exist,
     219             :          * and some don't.  We don't know exactly which are which, so if we
     220             :          * find a cross-type operator for which there are no support functions
     221             :          * at all, let it pass.  (Don't expect that all operators exist for
     222             :          * such cross-type cases, either.)
     223             :          */
     224        1176 :         if (thisgroup->functionset == 0 &&
     225         492 :             thisgroup->lefttype != thisgroup->righttype)
     226         492 :             continue;
     227             : 
     228             :         /*
     229             :          * Else complain if there seems to be an incomplete set of either
     230             :          * operators or support functions for this datatype pair.
     231             :          */
     232         684 :         if (thisgroup->operatorset != allops)
     233             :         {
     234           0 :             ereport(INFO,
     235             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     236             :                      errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s",
     237             :                             opfamilyname, "brin",
     238             :                             format_type_be(thisgroup->lefttype),
     239             :                             format_type_be(thisgroup->righttype))));
     240           0 :             result = false;
     241             :         }
     242         684 :         if (thisgroup->functionset != allfuncs)
     243             :         {
     244           0 :             ereport(INFO,
     245             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     246             :                      errmsg("operator family \"%s\" of access method %s is missing support function(s) for types %s and %s",
     247             :                             opfamilyname, "brin",
     248             :                             format_type_be(thisgroup->lefttype),
     249             :                             format_type_be(thisgroup->righttype))));
     250           0 :             result = false;
     251             :         }
     252             :     }
     253             : 
     254             :     /* Check that the originally-named opclass is complete */
     255         432 :     if (!opclassgroup || opclassgroup->operatorset != allops)
     256             :     {
     257           0 :         ereport(INFO,
     258             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     259             :                  errmsg("operator class \"%s\" of access method %s is missing operator(s)",
     260             :                         opclassname, "brin")));
     261           0 :         result = false;
     262             :     }
     263        2160 :     for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
     264             :     {
     265        1728 :         if (opclassgroup &&
     266        1728 :             (opclassgroup->functionset & (((int64) 1) << i)) != 0)
     267        1728 :             continue;           /* got it */
     268           0 :         ereport(INFO,
     269             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     270             :                  errmsg("operator class \"%s\" of access method %s is missing support function %d",
     271             :                         opclassname, "brin", i)));
     272           0 :         result = false;
     273             :     }
     274             : 
     275         432 :     ReleaseCatCacheList(proclist);
     276         432 :     ReleaseCatCacheList(oprlist);
     277         432 :     ReleaseSysCache(familytup);
     278         432 :     ReleaseSysCache(classtup);
     279             : 
     280         432 :     return result;
     281             : }

Generated by: LCOV version 1.14