LCOV - code coverage report
Current view: top level - src/backend/commands - indexcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 925 1037 89.2 %
Date: 2020-06-01 10:07:15 Functions: 22 22 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * indexcmds.c
       4             :  *    POSTGRES define and remove index code.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/commands/indexcmds.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/amapi.h"
      19             : #include "access/heapam.h"
      20             : #include "access/htup_details.h"
      21             : #include "access/reloptions.h"
      22             : #include "access/sysattr.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/catalog.h"
      26             : #include "catalog/index.h"
      27             : #include "catalog/indexing.h"
      28             : #include "catalog/pg_am.h"
      29             : #include "catalog/pg_constraint.h"
      30             : #include "catalog/pg_inherits.h"
      31             : #include "catalog/pg_opclass.h"
      32             : #include "catalog/pg_opfamily.h"
      33             : #include "catalog/pg_tablespace.h"
      34             : #include "catalog/pg_type.h"
      35             : #include "commands/comment.h"
      36             : #include "commands/dbcommands.h"
      37             : #include "commands/defrem.h"
      38             : #include "commands/event_trigger.h"
      39             : #include "commands/progress.h"
      40             : #include "commands/tablecmds.h"
      41             : #include "commands/tablespace.h"
      42             : #include "mb/pg_wchar.h"
      43             : #include "miscadmin.h"
      44             : #include "nodes/makefuncs.h"
      45             : #include "nodes/nodeFuncs.h"
      46             : #include "optimizer/optimizer.h"
      47             : #include "parser/parse_coerce.h"
      48             : #include "parser/parse_func.h"
      49             : #include "parser/parse_oper.h"
      50             : #include "partitioning/partdesc.h"
      51             : #include "pgstat.h"
      52             : #include "rewrite/rewriteManip.h"
      53             : #include "storage/lmgr.h"
      54             : #include "storage/proc.h"
      55             : #include "storage/procarray.h"
      56             : #include "storage/sinvaladt.h"
      57             : #include "utils/acl.h"
      58             : #include "utils/builtins.h"
      59             : #include "utils/fmgroids.h"
      60             : #include "utils/inval.h"
      61             : #include "utils/lsyscache.h"
      62             : #include "utils/memutils.h"
      63             : #include "utils/partcache.h"
      64             : #include "utils/pg_rusage.h"
      65             : #include "utils/regproc.h"
      66             : #include "utils/snapmgr.h"
      67             : #include "utils/syscache.h"
      68             : 
      69             : 
      70             : /* non-export function prototypes */
      71             : static void CheckPredicate(Expr *predicate);
      72             : static void ComputeIndexAttrs(IndexInfo *indexInfo,
      73             :                               Oid *typeOidP,
      74             :                               Oid *collationOidP,
      75             :                               Oid *classOidP,
      76             :                               int16 *colOptionP,
      77             :                               List *attList,
      78             :                               List *exclusionOpNames,
      79             :                               Oid relId,
      80             :                               const char *accessMethodName, Oid accessMethodId,
      81             :                               bool amcanorder,
      82             :                               bool isconstraint);
      83             : static char *ChooseIndexName(const char *tabname, Oid namespaceId,
      84             :                              List *colnames, List *exclusionOpNames,
      85             :                              bool primary, bool isconstraint);
      86             : static char *ChooseIndexNameAddition(List *colnames);
      87             : static List *ChooseIndexColumnNames(List *indexElems);
      88             : static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
      89             :                                             Oid relId, Oid oldRelId, void *arg);
      90             : static bool ReindexRelationConcurrently(Oid relationOid, int options);
      91             : static void ReindexPartitionedIndex(Relation parentIdx);
      92             : static void update_relispartition(Oid relationId, bool newval);
      93             : static bool CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts);
      94             : 
      95             : /*
      96             :  * callback argument type for RangeVarCallbackForReindexIndex()
      97             :  */
      98             : struct ReindexIndexCallbackState
      99             : {
     100             :     bool        concurrent;     /* flag from statement */
     101             :     Oid         locked_table_oid;   /* tracks previously locked table */
     102             : };
     103             : 
     104             : /*
     105             :  * CheckIndexCompatible
     106             :  *      Determine whether an existing index definition is compatible with a
     107             :  *      prospective index definition, such that the existing index storage
     108             :  *      could become the storage of the new index, avoiding a rebuild.
     109             :  *
     110             :  * 'heapRelation': the relation the index would apply to.
     111             :  * 'accessMethodName': name of the AM to use.
     112             :  * 'attributeList': a list of IndexElem specifying columns and expressions
     113             :  *      to index on.
     114             :  * 'exclusionOpNames': list of names of exclusion-constraint operators,
     115             :  *      or NIL if not an exclusion constraint.
     116             :  *
     117             :  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
     118             :  * any indexes that depended on a changing column from their pg_get_indexdef
     119             :  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
     120             :  * DefineIndex.  We assume that the old and new indexes have the same number
     121             :  * of columns and that if one has an expression column or predicate, both do.
     122             :  * Errors arising from the attribute list still apply.
     123             :  *
     124             :  * Most column type changes that can skip a table rewrite do not invalidate
     125             :  * indexes.  We acknowledge this when all operator classes, collations and
     126             :  * exclusion operators match.  Though we could further permit intra-opfamily
     127             :  * changes for btree and hash indexes, that adds subtle complexity with no
     128             :  * concrete benefit for core types. Note, that INCLUDE columns aren't
     129             :  * checked by this function, for them it's enough that table rewrite is
     130             :  * skipped.
     131             :  *
     132             :  * When a comparison or exclusion operator has a polymorphic input type, the
     133             :  * actual input types must also match.  This defends against the possibility
     134             :  * that operators could vary behavior in response to get_fn_expr_argtype().
     135             :  * At present, this hazard is theoretical: check_exclusion_constraint() and
     136             :  * all core index access methods decline to set fn_expr for such calls.
     137             :  *
     138             :  * We do not yet implement a test to verify compatibility of expression
     139             :  * columns or predicates, so assume any such index is incompatible.
     140             :  */
     141             : bool
     142          68 : CheckIndexCompatible(Oid oldId,
     143             :                      const char *accessMethodName,
     144             :                      List *attributeList,
     145             :                      List *exclusionOpNames)
     146             : {
     147             :     bool        isconstraint;
     148             :     Oid        *typeObjectId;
     149             :     Oid        *collationObjectId;
     150             :     Oid        *classObjectId;
     151             :     Oid         accessMethodId;
     152             :     Oid         relationId;
     153             :     HeapTuple   tuple;
     154             :     Form_pg_index indexForm;
     155             :     Form_pg_am  accessMethodForm;
     156             :     IndexAmRoutine *amRoutine;
     157             :     bool        amcanorder;
     158             :     int16      *coloptions;
     159             :     IndexInfo  *indexInfo;
     160             :     int         numberOfAttributes;
     161             :     int         old_natts;
     162             :     bool        isnull;
     163          68 :     bool        ret = true;
     164             :     oidvector  *old_indclass;
     165             :     oidvector  *old_indcollation;
     166             :     Relation    irel;
     167             :     int         i;
     168             :     Datum       d;
     169             : 
     170             :     /* Caller should already have the relation locked in some way. */
     171          68 :     relationId = IndexGetRelation(oldId, false);
     172             : 
     173             :     /*
     174             :      * We can pretend isconstraint = false unconditionally.  It only serves to
     175             :      * decide the text of an error message that should never happen for us.
     176             :      */
     177          68 :     isconstraint = false;
     178             : 
     179          68 :     numberOfAttributes = list_length(attributeList);
     180             :     Assert(numberOfAttributes > 0);
     181             :     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
     182             : 
     183             :     /* look up the access method */
     184          68 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     185          68 :     if (!HeapTupleIsValid(tuple))
     186           0 :         ereport(ERROR,
     187             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     188             :                  errmsg("access method \"%s\" does not exist",
     189             :                         accessMethodName)));
     190          68 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     191          68 :     accessMethodId = accessMethodForm->oid;
     192          68 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     193          68 :     ReleaseSysCache(tuple);
     194             : 
     195          68 :     amcanorder = amRoutine->amcanorder;
     196             : 
     197             :     /*
     198             :      * Compute the operator classes, collations, and exclusion operators for
     199             :      * the new index, so we can test whether it's compatible with the existing
     200             :      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
     201             :      * DefineIndex would have called this function with the same arguments
     202             :      * later on, and it would have failed then anyway.  Our attributeList
     203             :      * contains only key attributes, thus we're filling ii_NumIndexAttrs and
     204             :      * ii_NumIndexKeyAttrs with same value.
     205             :      */
     206          68 :     indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
     207             :                               accessMethodId, NIL, NIL, false, false, false);
     208          68 :     typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     209          68 :     collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     210          68 :     classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     211          68 :     coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
     212          68 :     ComputeIndexAttrs(indexInfo,
     213             :                       typeObjectId, collationObjectId, classObjectId,
     214             :                       coloptions, attributeList,
     215             :                       exclusionOpNames, relationId,
     216             :                       accessMethodName, accessMethodId,
     217             :                       amcanorder, isconstraint);
     218             : 
     219             : 
     220             :     /* Get the soon-obsolete pg_index tuple. */
     221          68 :     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
     222          68 :     if (!HeapTupleIsValid(tuple))
     223           0 :         elog(ERROR, "cache lookup failed for index %u", oldId);
     224          68 :     indexForm = (Form_pg_index) GETSTRUCT(tuple);
     225             : 
     226             :     /*
     227             :      * We don't assess expressions or predicates; assume incompatibility.
     228             :      * Also, if the index is invalid for any reason, treat it as incompatible.
     229             :      */
     230         136 :     if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
     231          68 :           heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
     232          68 :           indexForm->indisvalid))
     233             :     {
     234           0 :         ReleaseSysCache(tuple);
     235           0 :         return false;
     236             :     }
     237             : 
     238             :     /* Any change in operator class or collation breaks compatibility. */
     239          68 :     old_natts = indexForm->indnkeyatts;
     240             :     Assert(old_natts == numberOfAttributes);
     241             : 
     242          68 :     d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
     243             :     Assert(!isnull);
     244          68 :     old_indcollation = (oidvector *) DatumGetPointer(d);
     245             : 
     246          68 :     d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indclass, &isnull);
     247             :     Assert(!isnull);
     248          68 :     old_indclass = (oidvector *) DatumGetPointer(d);
     249             : 
     250         136 :     ret = (memcmp(old_indclass->values, classObjectId,
     251         136 :                   old_natts * sizeof(Oid)) == 0 &&
     252          68 :            memcmp(old_indcollation->values, collationObjectId,
     253             :                   old_natts * sizeof(Oid)) == 0);
     254             : 
     255          68 :     ReleaseSysCache(tuple);
     256             : 
     257          68 :     if (!ret)
     258           0 :         return false;
     259             : 
     260             :     /* For polymorphic opcintype, column type changes break compatibility. */
     261          68 :     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
     262         140 :     for (i = 0; i < old_natts; i++)
     263             :     {
     264          72 :         if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
     265           0 :             TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
     266             :         {
     267           0 :             ret = false;
     268           0 :             break;
     269             :         }
     270             :     }
     271             : 
     272             :     /* Any change in opclass options break compatibility. */
     273          68 :     if (ret)
     274             :     {
     275          68 :         Datum      *opclassOptions = RelationGetIndexRawAttOptions(irel);
     276             : 
     277          68 :         ret = CompareOpclassOptions(opclassOptions,
     278             :                                     indexInfo->ii_OpclassOptions, old_natts);
     279             : 
     280          68 :         if (opclassOptions)
     281           0 :             pfree(opclassOptions);
     282             :     }
     283             : 
     284             :     /* Any change in exclusion operator selections breaks compatibility. */
     285          68 :     if (ret && indexInfo->ii_ExclusionOps != NULL)
     286             :     {
     287             :         Oid        *old_operators,
     288             :                    *old_procs;
     289             :         uint16     *old_strats;
     290             : 
     291           0 :         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
     292           0 :         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
     293             :                      old_natts * sizeof(Oid)) == 0;
     294             : 
     295             :         /* Require an exact input type match for polymorphic operators. */
     296           0 :         if (ret)
     297             :         {
     298           0 :             for (i = 0; i < old_natts && ret; i++)
     299             :             {
     300             :                 Oid         left,
     301             :                             right;
     302             : 
     303           0 :                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
     304           0 :                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
     305           0 :                     TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
     306             :                 {
     307           0 :                     ret = false;
     308           0 :                     break;
     309             :                 }
     310             :             }
     311             :         }
     312             :     }
     313             : 
     314          68 :     index_close(irel, NoLock);
     315          68 :     return ret;
     316             : }
     317             : 
     318             : /*
     319             :  * CompareOpclassOptions
     320             :  *
     321             :  * Compare per-column opclass options which are represented by arrays of text[]
     322             :  * datums.  Both elements of arrays and array themselves can be NULL.
     323             :  */
     324             : static bool
     325          68 : CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts)
     326             : {
     327             :     int         i;
     328             : 
     329          68 :     if (!opts1 && !opts2)
     330          68 :         return true;
     331             : 
     332           0 :     for (i = 0; i < natts; i++)
     333             :     {
     334           0 :         Datum       opt1 = opts1 ? opts1[i] : (Datum) 0;
     335           0 :         Datum       opt2 = opts2 ? opts2[i] : (Datum) 0;
     336             : 
     337           0 :         if (opt1 == (Datum) 0)
     338             :         {
     339           0 :             if (opt2 == (Datum) 0)
     340           0 :                 continue;
     341             :             else
     342           0 :                 return false;
     343             :         }
     344           0 :         else if (opt2 == (Datum) 0)
     345           0 :             return false;
     346             : 
     347             :         /* Compare non-NULL text[] datums. */
     348           0 :         if (!DatumGetBool(DirectFunctionCall2(array_eq, opt1, opt2)))
     349           0 :             return false;
     350             :     }
     351             : 
     352           0 :     return true;
     353             : }
     354             : 
     355             : /*
     356             :  * WaitForOlderSnapshots
     357             :  *
     358             :  * Wait for transactions that might have an older snapshot than the given xmin
     359             :  * limit, because it might not contain tuples deleted just before it has
     360             :  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
     361             :  * individually. This is used when building an index concurrently.
     362             :  *
     363             :  * We can exclude any running transactions that have xmin > the xmin given;
     364             :  * their oldest snapshot must be newer than our xmin limit.
     365             :  * We can also exclude any transactions that have xmin = zero, since they
     366             :  * evidently have no live snapshot at all (and any one they might be in
     367             :  * process of taking is certainly newer than ours).  Transactions in other
     368             :  * DBs can be ignored too, since they'll never even be able to see the
     369             :  * index being worked on.
     370             :  *
     371             :  * We can also exclude autovacuum processes and processes running manual
     372             :  * lazy VACUUMs, because they won't be fazed by missing index entries
     373             :  * either.  (Manual ANALYZEs, however, can't be excluded because they
     374             :  * might be within transactions that are going to do arbitrary operations
     375             :  * later.)
     376             :  *
     377             :  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
     378             :  * check for that.
     379             :  *
     380             :  * If a process goes idle-in-transaction with xmin zero, we do not need to
     381             :  * wait for it anymore, per the above argument.  We do not have the
     382             :  * infrastructure right now to stop waiting if that happens, but we can at
     383             :  * least avoid the folly of waiting when it is idle at the time we would
     384             :  * begin to wait.  We do this by repeatedly rechecking the output of
     385             :  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
     386             :  * doesn't show up in the output, we know we can forget about it.
     387             :  */
     388             : static void
     389         164 : WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
     390             : {
     391             :     int         n_old_snapshots;
     392             :     int         i;
     393             :     VirtualTransactionId *old_snapshots;
     394             : 
     395         164 :     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
     396             :                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
     397             :                                           &n_old_snapshots);
     398         164 :     if (progress)
     399         164 :         pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
     400             : 
     401         204 :     for (i = 0; i < n_old_snapshots; i++)
     402             :     {
     403          40 :         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
     404           2 :             continue;           /* found uninteresting in previous cycle */
     405             : 
     406          38 :         if (i > 0)
     407             :         {
     408             :             /* see if anything's changed ... */
     409             :             VirtualTransactionId *newer_snapshots;
     410             :             int         n_newer_snapshots;
     411             :             int         j;
     412             :             int         k;
     413             : 
     414          12 :             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
     415             :                                                     true, false,
     416             :                                                     PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
     417             :                                                     &n_newer_snapshots);
     418          34 :             for (j = i; j < n_old_snapshots; j++)
     419             :             {
     420          22 :                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
     421           0 :                     continue;   /* found uninteresting in previous cycle */
     422          50 :                 for (k = 0; k < n_newer_snapshots; k++)
     423             :                 {
     424          42 :                     if (VirtualTransactionIdEquals(old_snapshots[j],
     425             :                                                    newer_snapshots[k]))
     426          14 :                         break;
     427             :                 }
     428          22 :                 if (k >= n_newer_snapshots) /* not there anymore */
     429           8 :                     SetInvalidVirtualTransactionId(old_snapshots[j]);
     430             :             }
     431          12 :             pfree(newer_snapshots);
     432             :         }
     433             : 
     434          38 :         if (VirtualTransactionIdIsValid(old_snapshots[i]))
     435             :         {
     436             :             /* If requested, publish who we're going to wait for. */
     437          32 :             if (progress)
     438             :             {
     439          32 :                 PGPROC     *holder = BackendIdGetProc(old_snapshots[i].backendId);
     440             : 
     441          32 :                 if (holder)
     442          32 :                     pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
     443          32 :                                                  holder->pid);
     444             :             }
     445          32 :             VirtualXactLock(old_snapshots[i], true);
     446             :         }
     447             : 
     448          38 :         if (progress)
     449          38 :             pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
     450             :     }
     451         164 : }
     452             : 
     453             : 
     454             : /*
     455             :  * DefineIndex
     456             :  *      Creates a new index.
     457             :  *
     458             :  * 'relationId': the OID of the heap relation on which the index is to be
     459             :  *      created
     460             :  * 'stmt': IndexStmt describing the properties of the new index.
     461             :  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
     462             :  *      nonzero to specify a preselected OID for the index.
     463             :  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
     464             :  *      of a partitioned index.
     465             :  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
     466             :  *      the child of a constraint (only used when recursing)
     467             :  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
     468             :  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
     469             :  *      should be true except when ALTER is deleting/recreating an index.)
     470             :  * 'check_not_in_use': check for table not already in use in current session.
     471             :  *      This should be true unless caller is holding the table open, in which
     472             :  *      case the caller had better have checked it earlier.
     473             :  * 'skip_build': make the catalog entries but don't create the index files
     474             :  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
     475             :  *
     476             :  * Returns the object address of the created index.
     477             :  */
     478             : ObjectAddress
     479       50868 : DefineIndex(Oid relationId,
     480             :             IndexStmt *stmt,
     481             :             Oid indexRelationId,
     482             :             Oid parentIndexId,
     483             :             Oid parentConstraintId,
     484             :             bool is_alter_table,
     485             :             bool check_rights,
     486             :             bool check_not_in_use,
     487             :             bool skip_build,
     488             :             bool quiet)
     489             : {
     490             :     bool        concurrent;
     491             :     char       *indexRelationName;
     492             :     char       *accessMethodName;
     493             :     Oid        *typeObjectId;
     494             :     Oid        *collationObjectId;
     495             :     Oid        *classObjectId;
     496             :     Oid         accessMethodId;
     497             :     Oid         namespaceId;
     498             :     Oid         tablespaceId;
     499       50868 :     Oid         createdConstraintId = InvalidOid;
     500             :     List       *indexColNames;
     501             :     List       *allIndexParams;
     502             :     Relation    rel;
     503             :     HeapTuple   tuple;
     504             :     Form_pg_am  accessMethodForm;
     505             :     IndexAmRoutine *amRoutine;
     506             :     bool        amcanorder;
     507             :     amoptions_function amoptions;
     508             :     bool        partitioned;
     509             :     Datum       reloptions;
     510             :     int16      *coloptions;
     511             :     IndexInfo  *indexInfo;
     512             :     bits16      flags;
     513             :     bits16      constr_flags;
     514             :     int         numberOfAttributes;
     515             :     int         numberOfKeyAttributes;
     516             :     TransactionId limitXmin;
     517             :     ObjectAddress address;
     518             :     LockRelId   heaprelid;
     519             :     LOCKTAG     heaplocktag;
     520             :     LOCKMODE    lockmode;
     521             :     Snapshot    snapshot;
     522       50868 :     int         save_nestlevel = -1;
     523             :     int         i;
     524             : 
     525             :     /*
     526             :      * Some callers need us to run with an empty default_tablespace; this is a
     527             :      * necessary hack to be able to reproduce catalog state accurately when
     528             :      * recreating indexes after table-rewriting ALTER TABLE.
     529             :      */
     530       50868 :     if (stmt->reset_default_tblspc)
     531             :     {
     532         304 :         save_nestlevel = NewGUCNestLevel();
     533         304 :         (void) set_config_option("default_tablespace", "",
     534             :                                  PGC_USERSET, PGC_S_SESSION,
     535             :                                  GUC_ACTION_SAVE, true, 0, false);
     536             :     }
     537             : 
     538             :     /*
     539             :      * Force non-concurrent build on temporary relations, even if CONCURRENTLY
     540             :      * was requested.  Other backends can't access a temporary relation, so
     541             :      * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
     542             :      * is more efficient.  Do this before any use of the concurrent option is
     543             :      * done.
     544             :      */
     545       50868 :     if (stmt->concurrent && get_rel_persistence(relationId) != RELPERSISTENCE_TEMP)
     546          52 :         concurrent = true;
     547             :     else
     548       50816 :         concurrent = false;
     549             : 
     550             :     /*
     551             :      * Start progress report.  If we're building a partition, this was already
     552             :      * done.
     553             :      */
     554       50868 :     if (!OidIsValid(parentIndexId))
     555             :     {
     556       49844 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
     557             :                                       relationId);
     558       49844 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
     559             :                                      concurrent ?
     560             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
     561             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE);
     562             :     }
     563             : 
     564             :     /*
     565             :      * No index OID to report yet
     566             :      */
     567       50868 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
     568             :                                  InvalidOid);
     569             : 
     570             :     /*
     571             :      * count key attributes in index
     572             :      */
     573       50868 :     numberOfKeyAttributes = list_length(stmt->indexParams);
     574             : 
     575             :     /*
     576             :      * Calculate the new list of index columns including both key columns and
     577             :      * INCLUDE columns.  Later we can determine which of these are key
     578             :      * columns, and which are just part of the INCLUDE list by checking the
     579             :      * list position.  A list item in a position less than ii_NumIndexKeyAttrs
     580             :      * is part of the key columns, and anything equal to and over is part of
     581             :      * the INCLUDE columns.
     582             :      */
     583       50868 :     allIndexParams = list_concat_copy(stmt->indexParams,
     584       50868 :                                       stmt->indexIncludingParams);
     585       50868 :     numberOfAttributes = list_length(allIndexParams);
     586             : 
     587       50868 :     if (numberOfAttributes <= 0)
     588           0 :         ereport(ERROR,
     589             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     590             :                  errmsg("must specify at least one column")));
     591       50868 :     if (numberOfAttributes > INDEX_MAX_KEYS)
     592           0 :         ereport(ERROR,
     593             :                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
     594             :                  errmsg("cannot use more than %d columns in an index",
     595             :                         INDEX_MAX_KEYS)));
     596             : 
     597             :     /*
     598             :      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     599             :      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     600             :      * (but not VACUUM).
     601             :      *
     602             :      * NB: Caller is responsible for making sure that relationId refers to the
     603             :      * relation on which the index should be built; except in bootstrap mode,
     604             :      * this will typically require the caller to have already locked the
     605             :      * relation.  To avoid lock upgrade hazards, that lock should be at least
     606             :      * as strong as the one we take here.
     607             :      *
     608             :      * NB: If the lock strength here ever changes, code that is run by
     609             :      * parallel workers under the control of certain particular ambuild
     610             :      * functions will need to be updated, too.
     611             :      */
     612       50868 :     lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
     613       50868 :     rel = table_open(relationId, lockmode);
     614             : 
     615       50868 :     namespaceId = RelationGetNamespace(rel);
     616             : 
     617             :     /* Ensure that it makes sense to index this kind of relation */
     618       50868 :     switch (rel->rd_rel->relkind)
     619             :     {
     620       50864 :         case RELKIND_RELATION:
     621             :         case RELKIND_MATVIEW:
     622             :         case RELKIND_PARTITIONED_TABLE:
     623             :             /* OK */
     624       50864 :             break;
     625           4 :         case RELKIND_FOREIGN_TABLE:
     626             : 
     627             :             /*
     628             :              * Custom error message for FOREIGN TABLE since the term is close
     629             :              * to a regular table and can confuse the user.
     630             :              */
     631           4 :             ereport(ERROR,
     632             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     633             :                      errmsg("cannot create index on foreign table \"%s\"",
     634             :                             RelationGetRelationName(rel))));
     635             :             break;
     636           0 :         default:
     637           0 :             ereport(ERROR,
     638             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     639             :                      errmsg("\"%s\" is not a table or materialized view",
     640             :                             RelationGetRelationName(rel))));
     641             :             break;
     642             :     }
     643             : 
     644             :     /*
     645             :      * Establish behavior for partitioned tables, and verify sanity of
     646             :      * parameters.
     647             :      *
     648             :      * We do not build an actual index in this case; we only create a few
     649             :      * catalog entries.  The actual indexes are built by recursing for each
     650             :      * partition.
     651             :      */
     652       50864 :     partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
     653       50864 :     if (partitioned)
     654             :     {
     655             :         /*
     656             :          * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
     657             :          * the error is thrown also for temporary tables.  Seems better to be
     658             :          * consistent, even though we could do it on temporary table because
     659             :          * we're not actually doing it concurrently.
     660             :          */
     661         888 :         if (stmt->concurrent)
     662           4 :             ereport(ERROR,
     663             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     664             :                      errmsg("cannot create index on partitioned table \"%s\" concurrently",
     665             :                             RelationGetRelationName(rel))));
     666         884 :         if (stmt->excludeOpNames)
     667           0 :             ereport(ERROR,
     668             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     669             :                      errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
     670             :                             RelationGetRelationName(rel))));
     671             :     }
     672             : 
     673             :     /*
     674             :      * Don't try to CREATE INDEX on temp tables of other backends.
     675             :      */
     676       50860 :     if (RELATION_IS_OTHER_TEMP(rel))
     677           0 :         ereport(ERROR,
     678             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     679             :                  errmsg("cannot create indexes on temporary tables of other sessions")));
     680             : 
     681             :     /*
     682             :      * Unless our caller vouches for having checked this already, insist that
     683             :      * the table not be in use by our own session, either.  Otherwise we might
     684             :      * fail to make entries in the new index (for instance, if an INSERT or
     685             :      * UPDATE is in progress and has already made its list of target indexes).
     686             :      */
     687       50860 :     if (check_not_in_use)
     688        7800 :         CheckTableNotInUse(rel, "CREATE INDEX");
     689             : 
     690             :     /*
     691             :      * Verify we (still) have CREATE rights in the rel's namespace.
     692             :      * (Presumably we did when the rel was created, but maybe not anymore.)
     693             :      * Skip check if caller doesn't want it.  Also skip check if
     694             :      * bootstrapping, since permissions machinery may not be working yet.
     695             :      */
     696       50856 :     if (check_rights && !IsBootstrapProcessingMode())
     697             :     {
     698             :         AclResult   aclresult;
     699             : 
     700        8360 :         aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
     701             :                                           ACL_CREATE);
     702        8360 :         if (aclresult != ACLCHECK_OK)
     703           0 :             aclcheck_error(aclresult, OBJECT_SCHEMA,
     704           0 :                            get_namespace_name(namespaceId));
     705             :     }
     706             : 
     707             :     /*
     708             :      * Select tablespace to use.  If not specified, use default tablespace
     709             :      * (which may in turn default to database's default).
     710             :      */
     711       50856 :     if (stmt->tableSpace)
     712             :     {
     713         130 :         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
     714         130 :         if (partitioned && tablespaceId == MyDatabaseTableSpace)
     715           4 :             ereport(ERROR,
     716             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     717             :                      errmsg("cannot specify default tablespace for partitioned relations")));
     718             :     }
     719             :     else
     720             :     {
     721       50726 :         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
     722             :                                             partitioned);
     723             :         /* note InvalidOid is OK in this case */
     724             :     }
     725             : 
     726             :     /* Check tablespace permissions */
     727       50848 :     if (check_rights &&
     728          66 :         OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
     729             :     {
     730             :         AclResult   aclresult;
     731             : 
     732          66 :         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
     733             :                                            ACL_CREATE);
     734          66 :         if (aclresult != ACLCHECK_OK)
     735           0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     736           0 :                            get_tablespace_name(tablespaceId));
     737             :     }
     738             : 
     739             :     /*
     740             :      * Force shared indexes into the pg_global tablespace.  This is a bit of a
     741             :      * hack but seems simpler than marking them in the BKI commands.  On the
     742             :      * other hand, if it's not shared, don't allow it to be placed there.
     743             :      */
     744       50848 :     if (rel->rd_rel->relisshared)
     745        6086 :         tablespaceId = GLOBALTABLESPACE_OID;
     746       44762 :     else if (tablespaceId == GLOBALTABLESPACE_OID)
     747           0 :         ereport(ERROR,
     748             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     749             :                  errmsg("only shared relations can be placed in pg_global tablespace")));
     750             : 
     751             :     /*
     752             :      * Choose the index column names.
     753             :      */
     754       50848 :     indexColNames = ChooseIndexColumnNames(allIndexParams);
     755             : 
     756             :     /*
     757             :      * Select name for index if caller didn't specify
     758             :      */
     759       50848 :     indexRelationName = stmt->idxname;
     760       50848 :     if (indexRelationName == NULL)
     761       12256 :         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
     762             :                                             namespaceId,
     763             :                                             indexColNames,
     764             :                                             stmt->excludeOpNames,
     765        6128 :                                             stmt->primary,
     766        6128 :                                             stmt->isconstraint);
     767             : 
     768             :     /*
     769             :      * look up the access method, verify it can handle the requested features
     770             :      */
     771       50848 :     accessMethodName = stmt->accessMethod;
     772       50848 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     773       50848 :     if (!HeapTupleIsValid(tuple))
     774             :     {
     775             :         /*
     776             :          * Hack to provide more-or-less-transparent updating of old RTREE
     777             :          * indexes to GiST: if RTREE is requested and not found, use GIST.
     778             :          */
     779           4 :         if (strcmp(accessMethodName, "rtree") == 0)
     780             :         {
     781           4 :             ereport(NOTICE,
     782             :                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
     783           4 :             accessMethodName = "gist";
     784           4 :             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     785             :         }
     786             : 
     787           4 :         if (!HeapTupleIsValid(tuple))
     788           0 :             ereport(ERROR,
     789             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     790             :                      errmsg("access method \"%s\" does not exist",
     791             :                             accessMethodName)));
     792             :     }
     793       50848 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     794       50848 :     accessMethodId = accessMethodForm->oid;
     795       50848 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     796             : 
     797       50848 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
     798             :                                  accessMethodId);
     799             : 
     800       50848 :     if (stmt->unique && !amRoutine->amcanunique)
     801           0 :         ereport(ERROR,
     802             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     803             :                  errmsg("access method \"%s\" does not support unique indexes",
     804             :                         accessMethodName)));
     805       50848 :     if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
     806          16 :         ereport(ERROR,
     807             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     808             :                  errmsg("access method \"%s\" does not support included columns",
     809             :                         accessMethodName)));
     810       50832 :     if (numberOfAttributes > 1 && !amRoutine->amcanmulticol)
     811           0 :         ereport(ERROR,
     812             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     813             :                  errmsg("access method \"%s\" does not support multicolumn indexes",
     814             :                         accessMethodName)));
     815       50832 :     if (stmt->excludeOpNames && amRoutine->amgettuple == NULL)
     816           0 :         ereport(ERROR,
     817             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     818             :                  errmsg("access method \"%s\" does not support exclusion constraints",
     819             :                         accessMethodName)));
     820             : 
     821       50832 :     amcanorder = amRoutine->amcanorder;
     822       50832 :     amoptions = amRoutine->amoptions;
     823             : 
     824       50832 :     pfree(amRoutine);
     825       50832 :     ReleaseSysCache(tuple);
     826             : 
     827             :     /*
     828             :      * Validate predicate, if given
     829             :      */
     830       50832 :     if (stmt->whereClause)
     831         226 :         CheckPredicate((Expr *) stmt->whereClause);
     832             : 
     833             :     /*
     834             :      * Parse AM-specific options, convert to text array form, validate.
     835             :      */
     836       50832 :     reloptions = transformRelOptions((Datum) 0, stmt->options,
     837             :                                      NULL, NULL, false, false);
     838             : 
     839       50828 :     (void) index_reloptions(amoptions, reloptions, true);
     840             : 
     841             :     /*
     842             :      * Prepare arguments for index_create, primarily an IndexInfo structure.
     843             :      * Note that predicates must be in implicit-AND format.  In a concurrent
     844             :      * build, mark it not-ready-for-inserts.
     845             :      */
     846       50772 :     indexInfo = makeIndexInfo(numberOfAttributes,
     847             :                               numberOfKeyAttributes,
     848             :                               accessMethodId,
     849             :                               NIL,  /* expressions, NIL for now */
     850       50772 :                               make_ands_implicit((Expr *) stmt->whereClause),
     851       50772 :                               stmt->unique,
     852       50772 :                               !concurrent,
     853       50772 :                               concurrent);
     854             : 
     855       50772 :     typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     856       50772 :     collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     857       50772 :     classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
     858       50772 :     coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
     859       50772 :     ComputeIndexAttrs(indexInfo,
     860             :                       typeObjectId, collationObjectId, classObjectId,
     861             :                       coloptions, allIndexParams,
     862             :                       stmt->excludeOpNames, relationId,
     863             :                       accessMethodName, accessMethodId,
     864       50772 :                       amcanorder, stmt->isconstraint);
     865             : 
     866             :     /*
     867             :      * Extra checks when creating a PRIMARY KEY index.
     868             :      */
     869       50744 :     if (stmt->primary)
     870        4978 :         index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
     871             : 
     872             :     /*
     873             :      * If this table is partitioned and we're creating a unique index or a
     874             :      * primary key, make sure that the partition key is a subset of the
     875             :      * index's columns.  Otherwise it would be possible to violate uniqueness
     876             :      * by putting values that ought to be unique in different partitions.
     877             :      *
     878             :      * We could lift this limitation if we had global indexes, but those have
     879             :      * their own problems, so this is a useful feature combination.
     880             :      */
     881       50724 :     if (partitioned && (stmt->unique || stmt->primary))
     882             :     {
     883         472 :         PartitionKey key = RelationGetPartitionKey(rel);
     884             :         const char *constraint_type;
     885             :         int         i;
     886             : 
     887         472 :         if (stmt->primary)
     888         362 :             constraint_type = "PRIMARY KEY";
     889         110 :         else if (stmt->unique)
     890         110 :             constraint_type = "UNIQUE";
     891           0 :         else if (stmt->excludeOpNames != NIL)
     892           0 :             constraint_type = "EXCLUDE";
     893             :         else
     894             :         {
     895           0 :             elog(ERROR, "unknown constraint type");
     896             :             constraint_type = NULL; /* keep compiler quiet */
     897             :         }
     898             : 
     899             :         /*
     900             :          * Verify that all the columns in the partition key appear in the
     901             :          * unique key definition, with the same notion of equality.
     902             :          */
     903         944 :         for (i = 0; i < key->partnatts; i++)
     904             :         {
     905         516 :             bool        found = false;
     906             :             int         eq_strategy;
     907             :             Oid         ptkey_eqop;
     908             :             int         j;
     909             : 
     910             :             /*
     911             :              * Identify the equality operator associated with this partkey
     912             :              * column.  For list and range partitioning, partkeys use btree
     913             :              * operator classes; hash partitioning uses hash operator classes.
     914             :              * (Keep this in sync with ComputePartitionAttrs!)
     915             :              */
     916         516 :             if (key->strategy == PARTITION_STRATEGY_HASH)
     917          20 :                 eq_strategy = HTEqualStrategyNumber;
     918             :             else
     919         496 :                 eq_strategy = BTEqualStrategyNumber;
     920             : 
     921        1548 :             ptkey_eqop = get_opfamily_member(key->partopfamily[i],
     922         516 :                                              key->partopcintype[i],
     923         516 :                                              key->partopcintype[i],
     924             :                                              eq_strategy);
     925         516 :             if (!OidIsValid(ptkey_eqop))
     926           0 :                 elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
     927             :                      eq_strategy, key->partopcintype[i], key->partopcintype[i],
     928             :                      key->partopfamily[i]);
     929             : 
     930             :             /*
     931             :              * We'll need to be able to identify the equality operators
     932             :              * associated with index columns, too.  We know what to do with
     933             :              * btree opclasses; if there are ever any other index types that
     934             :              * support unique indexes, this logic will need extension.
     935             :              */
     936         516 :             if (accessMethodId == BTREE_AM_OID)
     937         516 :                 eq_strategy = BTEqualStrategyNumber;
     938             :             else
     939           0 :                 ereport(ERROR,
     940             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     941             :                          errmsg("cannot match partition key to an index using access method \"%s\"",
     942             :                                 accessMethodName)));
     943             : 
     944             :             /*
     945             :              * It may be possible to support UNIQUE constraints when partition
     946             :              * keys are expressions, but is it worth it?  Give up for now.
     947             :              */
     948         516 :             if (key->partattrs[i] == 0)
     949           8 :                 ereport(ERROR,
     950             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     951             :                          errmsg("unsupported %s constraint with partition key definition",
     952             :                                 constraint_type),
     953             :                          errdetail("%s constraints cannot be used when partition keys include expressions.",
     954             :                                    constraint_type)));
     955             : 
     956             :             /* Search the index column(s) for a match */
     957         582 :             for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
     958             :             {
     959         546 :                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
     960             :                 {
     961             :                     /* Matched the column, now what about the equality op? */
     962             :                     Oid         idx_opfamily;
     963             :                     Oid         idx_opcintype;
     964             : 
     965         472 :                     if (get_opclass_opfamily_and_input_type(classObjectId[j],
     966             :                                                             &idx_opfamily,
     967             :                                                             &idx_opcintype))
     968             :                     {
     969             :                         Oid         idx_eqop;
     970             : 
     971         472 :                         idx_eqop = get_opfamily_member(idx_opfamily,
     972             :                                                        idx_opcintype,
     973             :                                                        idx_opcintype,
     974             :                                                        eq_strategy);
     975         472 :                         if (ptkey_eqop == idx_eqop)
     976             :                         {
     977         472 :                             found = true;
     978         472 :                             break;
     979             :                         }
     980             :                     }
     981             :                 }
     982             :             }
     983             : 
     984         508 :             if (!found)
     985             :             {
     986             :                 Form_pg_attribute att;
     987             : 
     988          36 :                 att = TupleDescAttr(RelationGetDescr(rel),
     989             :                                     key->partattrs[i] - 1);
     990          36 :                 ereport(ERROR,
     991             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     992             :                          errmsg("insufficient columns in %s constraint definition",
     993             :                                 constraint_type),
     994             :                          errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
     995             :                                    constraint_type, RelationGetRelationName(rel),
     996             :                                    NameStr(att->attname))));
     997             :             }
     998             :         }
     999             :     }
    1000             : 
    1001             : 
    1002             :     /*
    1003             :      * We disallow indexes on system columns.  They would not necessarily get
    1004             :      * updated correctly, and they don't seem useful anyway.
    1005             :      */
    1006      130846 :     for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
    1007             :     {
    1008       80166 :         AttrNumber  attno = indexInfo->ii_IndexAttrNumbers[i];
    1009             : 
    1010       80166 :         if (attno < 0)
    1011           0 :             ereport(ERROR,
    1012             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1013             :                      errmsg("index creation on system columns is not supported")));
    1014             :     }
    1015             : 
    1016             :     /*
    1017             :      * Also check for system columns used in expressions or predicates.
    1018             :      */
    1019       50680 :     if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
    1020             :     {
    1021         580 :         Bitmapset  *indexattrs = NULL;
    1022             : 
    1023         580 :         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
    1024         580 :         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
    1025             : 
    1026        4052 :         for (i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
    1027             :         {
    1028        3480 :             if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
    1029             :                               indexattrs))
    1030           8 :                 ereport(ERROR,
    1031             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1032             :                          errmsg("index creation on system columns is not supported")));
    1033             :         }
    1034             :     }
    1035             : 
    1036             :     /*
    1037             :      * Report index creation if appropriate (delay this till after most of the
    1038             :      * error checks)
    1039             :      */
    1040       50672 :     if (stmt->isconstraint && !quiet)
    1041             :     {
    1042             :         const char *constraint_type;
    1043             : 
    1044        5370 :         if (stmt->primary)
    1045        4878 :             constraint_type = "PRIMARY KEY";
    1046         492 :         else if (stmt->unique)
    1047         422 :             constraint_type = "UNIQUE";
    1048          70 :         else if (stmt->excludeOpNames != NIL)
    1049          70 :             constraint_type = "EXCLUDE";
    1050             :         else
    1051             :         {
    1052           0 :             elog(ERROR, "unknown constraint type");
    1053             :             constraint_type = NULL; /* keep compiler quiet */
    1054             :         }
    1055             : 
    1056        5370 :         ereport(DEBUG1,
    1057             :                 (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
    1058             :                         is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
    1059             :                         constraint_type,
    1060             :                         indexRelationName, RelationGetRelationName(rel))));
    1061             :     }
    1062             : 
    1063             :     /*
    1064             :      * A valid stmt->oldNode implies that we already have a built form of the
    1065             :      * index.  The caller should also decline any index build.
    1066             :      */
    1067             :     Assert(!OidIsValid(stmt->oldNode) || (skip_build && !concurrent));
    1068             : 
    1069             :     /*
    1070             :      * Make the catalog entries for the index, including constraints. This
    1071             :      * step also actually builds the index, except if caller requested not to
    1072             :      * or in concurrent mode, in which case it'll be done later, or doing a
    1073             :      * partitioned index (because those don't have storage).
    1074             :      */
    1075       50672 :     flags = constr_flags = 0;
    1076       50672 :     if (stmt->isconstraint)
    1077        5502 :         flags |= INDEX_CREATE_ADD_CONSTRAINT;
    1078       50672 :     if (skip_build || concurrent || partitioned)
    1079       42656 :         flags |= INDEX_CREATE_SKIP_BUILD;
    1080       50672 :     if (stmt->if_not_exists)
    1081          12 :         flags |= INDEX_CREATE_IF_NOT_EXISTS;
    1082       50672 :     if (concurrent)
    1083          48 :         flags |= INDEX_CREATE_CONCURRENT;
    1084       50672 :     if (partitioned)
    1085         832 :         flags |= INDEX_CREATE_PARTITIONED;
    1086       50672 :     if (stmt->primary)
    1087        4938 :         flags |= INDEX_CREATE_IS_PRIMARY;
    1088             : 
    1089             :     /*
    1090             :      * If the table is partitioned, and recursion was declined but partitions
    1091             :      * exist, mark the index as invalid.
    1092             :      */
    1093       50672 :     if (partitioned && stmt->relation && !stmt->relation->inh)
    1094             :     {
    1095         120 :         PartitionDesc pd = RelationGetPartitionDesc(rel);
    1096             : 
    1097         120 :         if (pd->nparts != 0)
    1098         116 :             flags |= INDEX_CREATE_INVALID;
    1099             :     }
    1100             : 
    1101       50672 :     if (stmt->deferrable)
    1102          54 :         constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
    1103       50672 :     if (stmt->initdeferred)
    1104          10 :         constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
    1105             : 
    1106             :     indexRelationId =
    1107       50672 :         index_create(rel, indexRelationName, indexRelationId, parentIndexId,
    1108             :                      parentConstraintId,
    1109             :                      stmt->oldNode, indexInfo, indexColNames,
    1110             :                      accessMethodId, tablespaceId,
    1111             :                      collationObjectId, classObjectId,
    1112             :                      coloptions, reloptions,
    1113             :                      flags, constr_flags,
    1114       50672 :                      allowSystemTableMods, !check_rights,
    1115       50672 :                      &createdConstraintId);
    1116             : 
    1117       50584 :     ObjectAddressSet(address, RelationRelationId, indexRelationId);
    1118             : 
    1119             :     /*
    1120             :      * Revert to original default_tablespace.  Must do this before any return
    1121             :      * from this function, but after index_create, so this is a good time.
    1122             :      */
    1123       50584 :     if (save_nestlevel >= 0)
    1124         304 :         AtEOXact_GUC(true, save_nestlevel);
    1125             : 
    1126       50584 :     if (!OidIsValid(indexRelationId))
    1127             :     {
    1128          12 :         table_close(rel, NoLock);
    1129             : 
    1130             :         /* If this is the top-level index, we're done */
    1131          12 :         if (!OidIsValid(parentIndexId))
    1132          12 :             pgstat_progress_end_command();
    1133             : 
    1134          12 :         return address;
    1135             :     }
    1136             : 
    1137             :     /* Add any requested comment */
    1138       50572 :     if (stmt->idxcomment != NULL)
    1139          44 :         CreateComments(indexRelationId, RelationRelationId, 0,
    1140          44 :                        stmt->idxcomment);
    1141             : 
    1142       50572 :     if (partitioned)
    1143             :     {
    1144             :         /*
    1145             :          * Unless caller specified to skip this step (via ONLY), process each
    1146             :          * partition to make sure they all contain a corresponding index.
    1147             :          *
    1148             :          * If we're called internally (no stmt->relation), recurse always.
    1149             :          */
    1150         832 :         if (!stmt->relation || stmt->relation->inh)
    1151             :         {
    1152         712 :             PartitionDesc partdesc = RelationGetPartitionDesc(rel);
    1153         712 :             int         nparts = partdesc->nparts;
    1154         712 :             Oid        *part_oids = palloc(sizeof(Oid) * nparts);
    1155         712 :             bool        invalidate_parent = false;
    1156             :             TupleDesc   parentDesc;
    1157             :             Oid        *opfamOids;
    1158             : 
    1159         712 :             pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
    1160             :                                          nparts);
    1161             : 
    1162         712 :             memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
    1163             : 
    1164         712 :             parentDesc = RelationGetDescr(rel);
    1165         712 :             opfamOids = palloc(sizeof(Oid) * numberOfKeyAttributes);
    1166        1560 :             for (i = 0; i < numberOfKeyAttributes; i++)
    1167         848 :                 opfamOids[i] = get_opclass_family(classObjectId[i]);
    1168             : 
    1169             :             /*
    1170             :              * For each partition, scan all existing indexes; if one matches
    1171             :              * our index definition and is not already attached to some other
    1172             :              * parent index, attach it to the one we just created.
    1173             :              *
    1174             :              * If none matches, build a new index by calling ourselves
    1175             :              * recursively with the same options (except for the index name).
    1176             :              */
    1177        1136 :             for (i = 0; i < nparts; i++)
    1178             :             {
    1179         440 :                 Oid         childRelid = part_oids[i];
    1180             :                 Relation    childrel;
    1181             :                 List       *childidxs;
    1182             :                 ListCell   *cell;
    1183             :                 AttrMap    *attmap;
    1184         440 :                 bool        found = false;
    1185             : 
    1186         440 :                 childrel = table_open(childRelid, lockmode);
    1187             : 
    1188             :                 /*
    1189             :                  * Don't try to create indexes on foreign tables, though. Skip
    1190             :                  * those if a regular index, or fail if trying to create a
    1191             :                  * constraint index.
    1192             :                  */
    1193         440 :                 if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
    1194             :                 {
    1195          12 :                     if (stmt->unique || stmt->primary)
    1196           8 :                         ereport(ERROR,
    1197             :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1198             :                                  errmsg("cannot create unique index on partitioned table \"%s\"",
    1199             :                                         RelationGetRelationName(rel)),
    1200             :                                  errdetail("Table \"%s\" contains partitions that are foreign tables.",
    1201             :                                            RelationGetRelationName(rel))));
    1202             : 
    1203           4 :                     table_close(childrel, lockmode);
    1204           4 :                     continue;
    1205             :                 }
    1206             : 
    1207         428 :                 childidxs = RelationGetIndexList(childrel);
    1208             :                 attmap =
    1209         428 :                     build_attrmap_by_name(RelationGetDescr(childrel),
    1210             :                                           parentDesc);
    1211             : 
    1212         622 :                 foreach(cell, childidxs)
    1213             :                 {
    1214         226 :                     Oid         cldidxid = lfirst_oid(cell);
    1215             :                     Relation    cldidx;
    1216             :                     IndexInfo  *cldIdxInfo;
    1217             : 
    1218             :                     /* this index is already partition of another one */
    1219         226 :                     if (has_superclass(cldidxid))
    1220         178 :                         continue;
    1221             : 
    1222          48 :                     cldidx = index_open(cldidxid, lockmode);
    1223          48 :                     cldIdxInfo = BuildIndexInfo(cldidx);
    1224          48 :                     if (CompareIndexInfo(cldIdxInfo, indexInfo,
    1225             :                                          cldidx->rd_indcollation,
    1226             :                                          collationObjectId,
    1227             :                                          cldidx->rd_opfamily,
    1228             :                                          opfamOids,
    1229             :                                          attmap))
    1230             :                     {
    1231          32 :                         Oid         cldConstrOid = InvalidOid;
    1232             : 
    1233             :                         /*
    1234             :                          * Found a match.
    1235             :                          *
    1236             :                          * If this index is being created in the parent
    1237             :                          * because of a constraint, then the child needs to
    1238             :                          * have a constraint also, so look for one.  If there
    1239             :                          * is no such constraint, this index is no good, so
    1240             :                          * keep looking.
    1241             :                          */
    1242          32 :                         if (createdConstraintId != InvalidOid)
    1243             :                         {
    1244             :                             cldConstrOid =
    1245           8 :                                 get_relation_idx_constraint_oid(childRelid,
    1246             :                                                                 cldidxid);
    1247           8 :                             if (cldConstrOid == InvalidOid)
    1248             :                             {
    1249           0 :                                 index_close(cldidx, lockmode);
    1250           0 :                                 continue;
    1251             :                             }
    1252             :                         }
    1253             : 
    1254             :                         /* Attach index to parent and we're done. */
    1255          32 :                         IndexSetParentIndex(cldidx, indexRelationId);
    1256          32 :                         if (createdConstraintId != InvalidOid)
    1257           8 :                             ConstraintSetParentConstraint(cldConstrOid,
    1258             :                                                           createdConstraintId,
    1259             :                                                           childRelid);
    1260             : 
    1261          32 :                         if (!cldidx->rd_index->indisvalid)
    1262           8 :                             invalidate_parent = true;
    1263             : 
    1264          32 :                         found = true;
    1265             :                         /* keep lock till commit */
    1266          32 :                         index_close(cldidx, NoLock);
    1267          32 :                         break;
    1268             :                     }
    1269             : 
    1270          16 :                     index_close(cldidx, lockmode);
    1271             :                 }
    1272             : 
    1273         428 :                 list_free(childidxs);
    1274         428 :                 table_close(childrel, NoLock);
    1275             : 
    1276             :                 /*
    1277             :                  * If no matching index was found, create our own.
    1278             :                  */
    1279         428 :                 if (!found)
    1280             :                 {
    1281         396 :                     IndexStmt  *childStmt = copyObject(stmt);
    1282             :                     bool        found_whole_row;
    1283             :                     ListCell   *lc;
    1284             : 
    1285             :                     /*
    1286             :                      * We can't use the same index name for the child index,
    1287             :                      * so clear idxname to let the recursive invocation choose
    1288             :                      * a new name.  Likewise, the existing target relation
    1289             :                      * field is wrong, and if indexOid or oldNode are set,
    1290             :                      * they mustn't be applied to the child either.
    1291             :                      */
    1292         396 :                     childStmt->idxname = NULL;
    1293         396 :                     childStmt->relation = NULL;
    1294         396 :                     childStmt->indexOid = InvalidOid;
    1295         396 :                     childStmt->oldNode = InvalidOid;
    1296         396 :                     childStmt->oldCreateSubid = InvalidSubTransactionId;
    1297         396 :                     childStmt->oldFirstRelfilenodeSubid = InvalidSubTransactionId;
    1298             : 
    1299             :                     /*
    1300             :                      * Adjust any Vars (both in expressions and in the index's
    1301             :                      * WHERE clause) to match the partition's column numbering
    1302             :                      * in case it's different from the parent's.
    1303             :                      */
    1304         920 :                     foreach(lc, childStmt->indexParams)
    1305             :                     {
    1306         524 :                         IndexElem  *ielem = lfirst(lc);
    1307             : 
    1308             :                         /*
    1309             :                          * If the index parameter is an expression, we must
    1310             :                          * translate it to contain child Vars.
    1311             :                          */
    1312         524 :                         if (ielem->expr)
    1313             :                         {
    1314          40 :                             ielem->expr =
    1315          40 :                                 map_variable_attnos((Node *) ielem->expr,
    1316             :                                                     1, 0, attmap,
    1317             :                                                     InvalidOid,
    1318             :                                                     &found_whole_row);
    1319          40 :                             if (found_whole_row)
    1320           0 :                                 elog(ERROR, "cannot convert whole-row table reference");
    1321             :                         }
    1322             :                     }
    1323         396 :                     childStmt->whereClause =
    1324         396 :                         map_variable_attnos(stmt->whereClause, 1, 0,
    1325             :                                             attmap,
    1326             :                                             InvalidOid, &found_whole_row);
    1327         396 :                     if (found_whole_row)
    1328           0 :                         elog(ERROR, "cannot convert whole-row table reference");
    1329             : 
    1330         396 :                     DefineIndex(childRelid, childStmt,
    1331             :                                 InvalidOid, /* no predefined OID */
    1332             :                                 indexRelationId,    /* this is our child */
    1333             :                                 createdConstraintId,
    1334             :                                 is_alter_table, check_rights, check_not_in_use,
    1335             :                                 skip_build, quiet);
    1336             :                 }
    1337             : 
    1338         420 :                 pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_DONE,
    1339         420 :                                              i + 1);
    1340         420 :                 free_attrmap(attmap);
    1341             :             }
    1342             : 
    1343             :             /*
    1344             :              * The pg_index row we inserted for this index was marked
    1345             :              * indisvalid=true.  But if we attached an existing index that is
    1346             :              * invalid, this is incorrect, so update our row to invalid too.
    1347             :              */
    1348         696 :             if (invalidate_parent)
    1349             :             {
    1350           8 :                 Relation    pg_index = table_open(IndexRelationId, RowExclusiveLock);
    1351             :                 HeapTuple   tup,
    1352             :                             newtup;
    1353             : 
    1354           8 :                 tup = SearchSysCache1(INDEXRELID,
    1355             :                                       ObjectIdGetDatum(indexRelationId));
    1356           8 :                 if (!HeapTupleIsValid(tup))
    1357           0 :                     elog(ERROR, "cache lookup failed for index %u",
    1358             :                          indexRelationId);
    1359           8 :                 newtup = heap_copytuple(tup);
    1360           8 :                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
    1361           8 :                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
    1362           8 :                 ReleaseSysCache(tup);
    1363           8 :                 table_close(pg_index, RowExclusiveLock);
    1364           8 :                 heap_freetuple(newtup);
    1365             :             }
    1366             :         }
    1367             : 
    1368             :         /*
    1369             :          * Indexes on partitioned tables are not themselves built, so we're
    1370             :          * done here.
    1371             :          */
    1372         816 :         table_close(rel, NoLock);
    1373         816 :         if (!OidIsValid(parentIndexId))
    1374         694 :             pgstat_progress_end_command();
    1375         816 :         return address;
    1376             :     }
    1377             : 
    1378       49740 :     if (!concurrent)
    1379             :     {
    1380             :         /* Close the heap and we're done, in the non-concurrent case */
    1381       49700 :         table_close(rel, NoLock);
    1382             : 
    1383             :         /* If this is the top-level index, we're done. */
    1384       49700 :         if (!OidIsValid(parentIndexId))
    1385       48822 :             pgstat_progress_end_command();
    1386             : 
    1387       49700 :         return address;
    1388             :     }
    1389             : 
    1390             :     /* save lockrelid and locktag for below, then close rel */
    1391          40 :     heaprelid = rel->rd_lockInfo.lockRelId;
    1392          40 :     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
    1393          40 :     table_close(rel, NoLock);
    1394             : 
    1395             :     /*
    1396             :      * For a concurrent build, it's important to make the catalog entries
    1397             :      * visible to other transactions before we start to build the index. That
    1398             :      * will prevent them from making incompatible HOT updates.  The new index
    1399             :      * will be marked not indisready and not indisvalid, so that no one else
    1400             :      * tries to either insert into it or use it for queries.
    1401             :      *
    1402             :      * We must commit our current transaction so that the index becomes
    1403             :      * visible; then start another.  Note that all the data structures we just
    1404             :      * built are lost in the commit.  The only data we keep past here are the
    1405             :      * relation IDs.
    1406             :      *
    1407             :      * Before committing, get a session-level lock on the table, to ensure
    1408             :      * that neither it nor the index can be dropped before we finish. This
    1409             :      * cannot block, even if someone else is waiting for access, because we
    1410             :      * already have the same lock within our transaction.
    1411             :      *
    1412             :      * Note: we don't currently bother with a session lock on the index,
    1413             :      * because there are no operations that could change its state while we
    1414             :      * hold lock on the parent table.  This might need to change later.
    1415             :      */
    1416          40 :     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1417             : 
    1418          40 :     PopActiveSnapshot();
    1419          40 :     CommitTransactionCommand();
    1420          40 :     StartTransactionCommand();
    1421             : 
    1422             :     /*
    1423             :      * The index is now visible, so we can report the OID.
    1424             :      */
    1425          40 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
    1426             :                                  indexRelationId);
    1427             : 
    1428             :     /*
    1429             :      * Phase 2 of concurrent index build (see comments for validate_index()
    1430             :      * for an overview of how this works)
    1431             :      *
    1432             :      * Now we must wait until no running transaction could have the table open
    1433             :      * with the old list of indexes.  Use ShareLock to consider running
    1434             :      * transactions that hold locks that permit writing to the table.  Note we
    1435             :      * do not need to worry about xacts that open the table for writing after
    1436             :      * this point; they will see the new index when they open it.
    1437             :      *
    1438             :      * Note: the reason we use actual lock acquisition here, rather than just
    1439             :      * checking the ProcArray and sleeping, is that deadlock is possible if
    1440             :      * one of the transactions in question is blocked trying to acquire an
    1441             :      * exclusive lock on our table.  The lock code will detect deadlock and
    1442             :      * error out properly.
    1443             :      */
    1444          40 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1445             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    1446          40 :     WaitForLockers(heaplocktag, ShareLock, true);
    1447             : 
    1448             :     /*
    1449             :      * At this moment we are sure that there are no transactions with the
    1450             :      * table open for write that don't have this new index in their list of
    1451             :      * indexes.  We have waited out all the existing transactions and any new
    1452             :      * transaction will have the new index in its list, but the index is still
    1453             :      * marked as "not-ready-for-inserts".  The index is consulted while
    1454             :      * deciding HOT-safety though.  This arrangement ensures that no new HOT
    1455             :      * chains can be created where the new tuple and the old tuple in the
    1456             :      * chain have different index keys.
    1457             :      *
    1458             :      * We now take a new snapshot, and build the index using all tuples that
    1459             :      * are visible in this snapshot.  We can be sure that any HOT updates to
    1460             :      * these tuples will be compatible with the index, since any updates made
    1461             :      * by transactions that didn't know about the index are now committed or
    1462             :      * rolled back.  Thus, each visible tuple is either the end of its
    1463             :      * HOT-chain or the extension of the chain is HOT-safe for this index.
    1464             :      */
    1465             : 
    1466             :     /* Set ActiveSnapshot since functions in the indexes may need it */
    1467          40 :     PushActiveSnapshot(GetTransactionSnapshot());
    1468             : 
    1469             :     /* Perform concurrent build of index */
    1470          40 :     index_concurrently_build(relationId, indexRelationId);
    1471             : 
    1472             :     /* we can do away with our snapshot */
    1473          32 :     PopActiveSnapshot();
    1474             : 
    1475             :     /*
    1476             :      * Commit this transaction to make the indisready update visible.
    1477             :      */
    1478          32 :     CommitTransactionCommand();
    1479          32 :     StartTransactionCommand();
    1480             : 
    1481             :     /*
    1482             :      * Phase 3 of concurrent index build
    1483             :      *
    1484             :      * We once again wait until no transaction can have the table open with
    1485             :      * the index marked as read-only for updates.
    1486             :      */
    1487          32 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1488             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    1489          32 :     WaitForLockers(heaplocktag, ShareLock, true);
    1490             : 
    1491             :     /*
    1492             :      * Now take the "reference snapshot" that will be used by validate_index()
    1493             :      * to filter candidate tuples.  Beware!  There might still be snapshots in
    1494             :      * use that treat some transaction as in-progress that our reference
    1495             :      * snapshot treats as committed.  If such a recently-committed transaction
    1496             :      * deleted tuples in the table, we will not include them in the index; yet
    1497             :      * those transactions which see the deleting one as still-in-progress will
    1498             :      * expect such tuples to be there once we mark the index as valid.
    1499             :      *
    1500             :      * We solve this by waiting for all endangered transactions to exit before
    1501             :      * we mark the index as valid.
    1502             :      *
    1503             :      * We also set ActiveSnapshot to this snap, since functions in indexes may
    1504             :      * need a snapshot.
    1505             :      */
    1506          32 :     snapshot = RegisterSnapshot(GetTransactionSnapshot());
    1507          32 :     PushActiveSnapshot(snapshot);
    1508             : 
    1509             :     /*
    1510             :      * Scan the index and the heap, insert any missing index entries.
    1511             :      */
    1512          32 :     validate_index(relationId, indexRelationId, snapshot);
    1513             : 
    1514             :     /*
    1515             :      * Drop the reference snapshot.  We must do this before waiting out other
    1516             :      * snapshot holders, else we will deadlock against other processes also
    1517             :      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
    1518             :      * they must wait for.  But first, save the snapshot's xmin to use as
    1519             :      * limitXmin for GetCurrentVirtualXIDs().
    1520             :      */
    1521          32 :     limitXmin = snapshot->xmin;
    1522             : 
    1523          32 :     PopActiveSnapshot();
    1524          32 :     UnregisterSnapshot(snapshot);
    1525             : 
    1526             :     /*
    1527             :      * The snapshot subsystem could still contain registered snapshots that
    1528             :      * are holding back our process's advertised xmin; in particular, if
    1529             :      * default_transaction_isolation = serializable, there is a transaction
    1530             :      * snapshot that is still active.  The CatalogSnapshot is likewise a
    1531             :      * hazard.  To ensure no deadlocks, we must commit and start yet another
    1532             :      * transaction, and do our wait before any snapshot has been taken in it.
    1533             :      */
    1534          32 :     CommitTransactionCommand();
    1535          32 :     StartTransactionCommand();
    1536             : 
    1537             :     /* We should now definitely not be advertising any xmin. */
    1538             :     Assert(MyPgXact->xmin == InvalidTransactionId);
    1539             : 
    1540             :     /*
    1541             :      * The index is now valid in the sense that it contains all currently
    1542             :      * interesting tuples.  But since it might not contain tuples deleted just
    1543             :      * before the reference snap was taken, we have to wait out any
    1544             :      * transactions that might have older snapshots.
    1545             :      */
    1546          32 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1547             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_3);
    1548          32 :     WaitForOlderSnapshots(limitXmin, true);
    1549             : 
    1550             :     /*
    1551             :      * Index can now be marked valid -- update its pg_index entry
    1552             :      */
    1553          32 :     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
    1554             : 
    1555             :     /*
    1556             :      * The pg_index update will cause backends (including this one) to update
    1557             :      * relcache entries for the index itself, but we should also send a
    1558             :      * relcache inval on the parent table to force replanning of cached plans.
    1559             :      * Otherwise existing sessions might fail to use the new index where it
    1560             :      * would be useful.  (Note that our earlier commits did not create reasons
    1561             :      * to replan; so relcache flush on the index itself was sufficient.)
    1562             :      */
    1563          32 :     CacheInvalidateRelcacheByRelid(heaprelid.relId);
    1564             : 
    1565             :     /*
    1566             :      * Last thing to do is release the session-level lock on the parent table.
    1567             :      */
    1568          32 :     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1569             : 
    1570          32 :     pgstat_progress_end_command();
    1571             : 
    1572          32 :     return address;
    1573             : }
    1574             : 
    1575             : 
    1576             : /*
    1577             :  * CheckMutability
    1578             :  *      Test whether given expression is mutable
    1579             :  */
    1580             : static bool
    1581         628 : CheckMutability(Expr *expr)
    1582             : {
    1583             :     /*
    1584             :      * First run the expression through the planner.  This has a couple of
    1585             :      * important consequences.  First, function default arguments will get
    1586             :      * inserted, which may affect volatility (consider "default now()").
    1587             :      * Second, inline-able functions will get inlined, which may allow us to
    1588             :      * conclude that the function is really less volatile than it's marked. As
    1589             :      * an example, polymorphic functions must be marked with the most volatile
    1590             :      * behavior that they have for any input type, but once we inline the
    1591             :      * function we may be able to conclude that it's not so volatile for the
    1592             :      * particular input type we're dealing with.
    1593             :      *
    1594             :      * We assume here that expression_planner() won't scribble on its input.
    1595             :      */
    1596         628 :     expr = expression_planner(expr);
    1597             : 
    1598             :     /* Now we can search for non-immutable functions */
    1599         628 :     return contain_mutable_functions((Node *) expr);
    1600             : }
    1601             : 
    1602             : 
    1603             : /*
    1604             :  * CheckPredicate
    1605             :  *      Checks that the given partial-index predicate is valid.
    1606             :  *
    1607             :  * This used to also constrain the form of the predicate to forms that
    1608             :  * indxpath.c could do something with.  However, that seems overly
    1609             :  * restrictive.  One useful application of partial indexes is to apply
    1610             :  * a UNIQUE constraint across a subset of a table, and in that scenario
    1611             :  * any evaluable predicate will work.  So accept any predicate here
    1612             :  * (except ones requiring a plan), and let indxpath.c fend for itself.
    1613             :  */
    1614             : static void
    1615         226 : CheckPredicate(Expr *predicate)
    1616             : {
    1617             :     /*
    1618             :      * transformExpr() should have already rejected subqueries, aggregates,
    1619             :      * and window functions, based on the EXPR_KIND_ for a predicate.
    1620             :      */
    1621             : 
    1622             :     /*
    1623             :      * A predicate using mutable functions is probably wrong, for the same
    1624             :      * reasons that we don't allow an index expression to use one.
    1625             :      */
    1626         226 :     if (CheckMutability(predicate))
    1627           0 :         ereport(ERROR,
    1628             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1629             :                  errmsg("functions in index predicate must be marked IMMUTABLE")));
    1630         226 : }
    1631             : 
    1632             : /*
    1633             :  * Compute per-index-column information, including indexed column numbers
    1634             :  * or index expressions, opclasses and their options. Note, all output vectors
    1635             :  * should be allocated for all columns, including "including" ones.
    1636             :  */
    1637             : static void
    1638       50840 : ComputeIndexAttrs(IndexInfo *indexInfo,
    1639             :                   Oid *typeOidP,
    1640             :                   Oid *collationOidP,
    1641             :                   Oid *classOidP,
    1642             :                   int16 *colOptionP,
    1643             :                   List *attList,    /* list of IndexElem's */
    1644             :                   List *exclusionOpNames,
    1645             :                   Oid relId,
    1646             :                   const char *accessMethodName,
    1647             :                   Oid accessMethodId,
    1648             :                   bool amcanorder,
    1649             :                   bool isconstraint)
    1650             : {
    1651             :     ListCell   *nextExclOp;
    1652             :     ListCell   *lc;
    1653             :     int         attn;
    1654       50840 :     int         nkeycols = indexInfo->ii_NumIndexKeyAttrs;
    1655             : 
    1656             :     /* Allocate space for exclusion operator info, if needed */
    1657       50840 :     if (exclusionOpNames)
    1658             :     {
    1659             :         Assert(list_length(exclusionOpNames) == nkeycols);
    1660          86 :         indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
    1661          86 :         indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
    1662          86 :         indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
    1663          86 :         nextExclOp = list_head(exclusionOpNames);
    1664             :     }
    1665             :     else
    1666       50754 :         nextExclOp = NULL;
    1667             : 
    1668             :     /*
    1669             :      * process attributeList
    1670             :      */
    1671       50840 :     attn = 0;
    1672      131146 :     foreach(lc, attList)
    1673             :     {
    1674       80334 :         IndexElem  *attribute = (IndexElem *) lfirst(lc);
    1675             :         Oid         atttype;
    1676             :         Oid         attcollation;
    1677             : 
    1678             :         /*
    1679             :          * Process the column-or-expression to be indexed.
    1680             :          */
    1681       80334 :         if (attribute->name != NULL)
    1682             :         {
    1683             :             /* Simple index attribute */
    1684             :             HeapTuple   atttuple;
    1685             :             Form_pg_attribute attform;
    1686             : 
    1687             :             Assert(attribute->expr == NULL);
    1688       79928 :             atttuple = SearchSysCacheAttName(relId, attribute->name);
    1689       79928 :             if (!HeapTupleIsValid(atttuple))
    1690             :             {
    1691             :                 /* difference in error message spellings is historical */
    1692          20 :                 if (isconstraint)
    1693          12 :                     ereport(ERROR,
    1694             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1695             :                              errmsg("column \"%s\" named in key does not exist",
    1696             :                                     attribute->name)));
    1697             :                 else
    1698           8 :                     ereport(ERROR,
    1699             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1700             :                              errmsg("column \"%s\" does not exist",
    1701             :                                     attribute->name)));
    1702             :             }
    1703       79908 :             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
    1704       79908 :             indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
    1705       79908 :             atttype = attform->atttypid;
    1706       79908 :             attcollation = attform->attcollation;
    1707       79908 :             ReleaseSysCache(atttuple);
    1708             :         }
    1709             :         else
    1710             :         {
    1711             :             /* Index expression */
    1712         406 :             Node       *expr = attribute->expr;
    1713             : 
    1714             :             Assert(expr != NULL);
    1715             : 
    1716         406 :             if (attn >= nkeycols)
    1717           0 :                 ereport(ERROR,
    1718             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1719             :                          errmsg("expressions are not supported in included columns")));
    1720         406 :             atttype = exprType(expr);
    1721         406 :             attcollation = exprCollation(expr);
    1722             : 
    1723             :             /*
    1724             :              * Strip any top-level COLLATE clause.  This ensures that we treat
    1725             :              * "x COLLATE y" and "(x COLLATE y)" alike.
    1726             :              */
    1727         418 :             while (IsA(expr, CollateExpr))
    1728          12 :                 expr = (Node *) ((CollateExpr *) expr)->arg;
    1729             : 
    1730         406 :             if (IsA(expr, Var) &&
    1731           4 :                 ((Var *) expr)->varattno != InvalidAttrNumber)
    1732             :             {
    1733             :                 /*
    1734             :                  * User wrote "(column)" or "(column COLLATE something)".
    1735             :                  * Treat it like simple attribute anyway.
    1736             :                  */
    1737           4 :                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
    1738             :             }
    1739             :             else
    1740             :             {
    1741         402 :                 indexInfo->ii_IndexAttrNumbers[attn] = 0;    /* marks expression */
    1742         402 :                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
    1743             :                                                     expr);
    1744             : 
    1745             :                 /*
    1746             :                  * transformExpr() should have already rejected subqueries,
    1747             :                  * aggregates, and window functions, based on the EXPR_KIND_
    1748             :                  * for an index expression.
    1749             :                  */
    1750             : 
    1751             :                 /*
    1752             :                  * An expression using mutable functions is probably wrong,
    1753             :                  * since if you aren't going to get the same result for the
    1754             :                  * same data every time, it's not clear what the index entries
    1755             :                  * mean at all.
    1756             :                  */
    1757         402 :                 if (CheckMutability((Expr *) expr))
    1758           0 :                     ereport(ERROR,
    1759             :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1760             :                              errmsg("functions in index expression must be marked IMMUTABLE")));
    1761             :             }
    1762             :         }
    1763             : 
    1764       80314 :         typeOidP[attn] = atttype;
    1765             : 
    1766             :         /*
    1767             :          * Included columns have no collation, no opclass and no ordering
    1768             :          * options.
    1769             :          */
    1770       80314 :         if (attn >= nkeycols)
    1771             :         {
    1772         444 :             if (attribute->collation)
    1773           0 :                 ereport(ERROR,
    1774             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1775             :                          errmsg("including column does not support a collation")));
    1776         444 :             if (attribute->opclass)
    1777           0 :                 ereport(ERROR,
    1778             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1779             :                          errmsg("including column does not support an operator class")));
    1780         444 :             if (attribute->ordering != SORTBY_DEFAULT)
    1781           0 :                 ereport(ERROR,
    1782             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1783             :                          errmsg("including column does not support ASC/DESC options")));
    1784         444 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    1785           0 :                 ereport(ERROR,
    1786             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1787             :                          errmsg("including column does not support NULLS FIRST/LAST options")));
    1788             : 
    1789         444 :             classOidP[attn] = InvalidOid;
    1790         444 :             colOptionP[attn] = 0;
    1791         444 :             collationOidP[attn] = InvalidOid;
    1792         444 :             attn++;
    1793             : 
    1794         444 :             continue;
    1795             :         }
    1796             : 
    1797             :         /*
    1798             :          * Apply collation override if any
    1799             :          */
    1800       79870 :         if (attribute->collation)
    1801          56 :             attcollation = get_collation_oid(attribute->collation, false);
    1802             : 
    1803             :         /*
    1804             :          * Check we have a collation iff it's a collatable type.  The only
    1805             :          * expected failures here are (1) COLLATE applied to a noncollatable
    1806             :          * type, or (2) index expression had an unresolved collation.  But we
    1807             :          * might as well code this to be a complete consistency check.
    1808             :          */
    1809       79870 :         if (type_is_collatable(atttype))
    1810             :         {
    1811       14394 :             if (!OidIsValid(attcollation))
    1812           0 :                 ereport(ERROR,
    1813             :                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
    1814             :                          errmsg("could not determine which collation to use for index expression"),
    1815             :                          errhint("Use the COLLATE clause to set the collation explicitly.")));
    1816             :         }
    1817             :         else
    1818             :         {
    1819       65476 :             if (OidIsValid(attcollation))
    1820           4 :                 ereport(ERROR,
    1821             :                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    1822             :                          errmsg("collations are not supported by type %s",
    1823             :                                 format_type_be(atttype))));
    1824             :         }
    1825             : 
    1826       79866 :         collationOidP[attn] = attcollation;
    1827             : 
    1828             :         /*
    1829             :          * Identify the opclass to use.
    1830             :          */
    1831       79866 :         classOidP[attn] = ResolveOpClass(attribute->opclass,
    1832             :                                          atttype,
    1833             :                                          accessMethodName,
    1834             :                                          accessMethodId);
    1835             : 
    1836             :         /*
    1837             :          * Identify the exclusion operator, if any.
    1838             :          */
    1839       79862 :         if (nextExclOp)
    1840             :         {
    1841         108 :             List       *opname = (List *) lfirst(nextExclOp);
    1842             :             Oid         opid;
    1843             :             Oid         opfamily;
    1844             :             int         strat;
    1845             : 
    1846             :             /*
    1847             :              * Find the operator --- it must accept the column datatype
    1848             :              * without runtime coercion (but binary compatibility is OK)
    1849             :              */
    1850         108 :             opid = compatible_oper_opid(opname, atttype, atttype, false);
    1851             : 
    1852             :             /*
    1853             :              * Only allow commutative operators to be used in exclusion
    1854             :              * constraints. If X conflicts with Y, but Y does not conflict
    1855             :              * with X, bad things will happen.
    1856             :              */
    1857         108 :             if (get_commutator(opid) != opid)
    1858           0 :                 ereport(ERROR,
    1859             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1860             :                          errmsg("operator %s is not commutative",
    1861             :                                 format_operator(opid)),
    1862             :                          errdetail("Only commutative operators can be used in exclusion constraints.")));
    1863             : 
    1864             :             /*
    1865             :              * Operator must be a member of the right opfamily, too
    1866             :              */
    1867         108 :             opfamily = get_opclass_family(classOidP[attn]);
    1868         108 :             strat = get_op_opfamily_strategy(opid, opfamily);
    1869         108 :             if (strat == 0)
    1870             :             {
    1871             :                 HeapTuple   opftuple;
    1872             :                 Form_pg_opfamily opfform;
    1873             : 
    1874             :                 /*
    1875             :                  * attribute->opclass might not explicitly name the opfamily,
    1876             :                  * so fetch the name of the selected opfamily for use in the
    1877             :                  * error message.
    1878             :                  */
    1879           0 :                 opftuple = SearchSysCache1(OPFAMILYOID,
    1880             :                                            ObjectIdGetDatum(opfamily));
    1881           0 :                 if (!HeapTupleIsValid(opftuple))
    1882           0 :                     elog(ERROR, "cache lookup failed for opfamily %u",
    1883             :                          opfamily);
    1884           0 :                 opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
    1885             : 
    1886           0 :                 ereport(ERROR,
    1887             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1888             :                          errmsg("operator %s is not a member of operator family \"%s\"",
    1889             :                                 format_operator(opid),
    1890             :                                 NameStr(opfform->opfname)),
    1891             :                          errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
    1892             :             }
    1893             : 
    1894         108 :             indexInfo->ii_ExclusionOps[attn] = opid;
    1895         108 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    1896         108 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    1897         108 :             nextExclOp = lnext(exclusionOpNames, nextExclOp);
    1898             :         }
    1899             : 
    1900             :         /*
    1901             :          * Set up the per-column options (indoption field).  For now, this is
    1902             :          * zero for any un-ordered index, while ordered indexes have DESC and
    1903             :          * NULLS FIRST/LAST options.
    1904             :          */
    1905       79862 :         colOptionP[attn] = 0;
    1906       79862 :         if (amcanorder)
    1907             :         {
    1908             :             /* default ordering is ASC */
    1909       78740 :             if (attribute->ordering == SORTBY_DESC)
    1910          28 :                 colOptionP[attn] |= INDOPTION_DESC;
    1911             :             /* default null ordering is LAST for ASC, FIRST for DESC */
    1912       78740 :             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
    1913             :             {
    1914       78720 :                 if (attribute->ordering == SORTBY_DESC)
    1915          20 :                     colOptionP[attn] |= INDOPTION_NULLS_FIRST;
    1916             :             }
    1917          20 :             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
    1918           8 :                 colOptionP[attn] |= INDOPTION_NULLS_FIRST;
    1919             :         }
    1920             :         else
    1921             :         {
    1922             :             /* index AM does not support ordering */
    1923        1122 :             if (attribute->ordering != SORTBY_DEFAULT)
    1924           0 :                 ereport(ERROR,
    1925             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1926             :                          errmsg("access method \"%s\" does not support ASC/DESC options",
    1927             :                                 accessMethodName)));
    1928        1122 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    1929           0 :                 ereport(ERROR,
    1930             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1931             :                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
    1932             :                                 accessMethodName)));
    1933             :         }
    1934             : 
    1935             :         /* Set up the per-column opclass options (attoptions field). */
    1936       79862 :         if (attribute->opclassopts)
    1937             :         {
    1938             :             Assert(attn < nkeycols);
    1939             : 
    1940          68 :             if (!indexInfo->ii_OpclassOptions)
    1941          68 :                 indexInfo->ii_OpclassOptions =
    1942          68 :                     palloc0(sizeof(Datum) * indexInfo->ii_NumIndexAttrs);
    1943             : 
    1944          68 :             indexInfo->ii_OpclassOptions[attn] =
    1945          68 :                 transformRelOptions((Datum) 0, attribute->opclassopts,
    1946             :                                     NULL, NULL, false, false);
    1947             :         }
    1948             : 
    1949       79862 :         attn++;
    1950             :     }
    1951       50812 : }
    1952             : 
    1953             : /*
    1954             :  * Resolve possibly-defaulted operator class specification
    1955             :  *
    1956             :  * Note: This is used to resolve operator class specifications in index and
    1957             :  * partition key definitions.
    1958             :  */
    1959             : Oid
    1960       79930 : ResolveOpClass(List *opclass, Oid attrType,
    1961             :                const char *accessMethodName, Oid accessMethodId)
    1962             : {
    1963             :     char       *schemaname;
    1964             :     char       *opcname;
    1965             :     HeapTuple   tuple;
    1966             :     Form_pg_opclass opform;
    1967             :     Oid         opClassId,
    1968             :                 opInputType;
    1969             : 
    1970       79930 :     if (opclass == NIL)
    1971             :     {
    1972             :         /* no operator class specified, so find the default */
    1973       10422 :         opClassId = GetDefaultOpClass(attrType, accessMethodId);
    1974       10422 :         if (!OidIsValid(opClassId))
    1975           4 :             ereport(ERROR,
    1976             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1977             :                      errmsg("data type %s has no default operator class for access method \"%s\"",
    1978             :                             format_type_be(attrType), accessMethodName),
    1979             :                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
    1980       10418 :         return opClassId;
    1981             :     }
    1982             : 
    1983             :     /*
    1984             :      * Specific opclass name given, so look up the opclass.
    1985             :      */
    1986             : 
    1987             :     /* deconstruct the name list */
    1988       69508 :     DeconstructQualifiedName(opclass, &schemaname, &opcname);
    1989             : 
    1990       69508 :     if (schemaname)
    1991             :     {
    1992             :         /* Look in specific schema only */
    1993             :         Oid         namespaceId;
    1994             : 
    1995           4 :         namespaceId = LookupExplicitNamespace(schemaname, false);
    1996           4 :         tuple = SearchSysCache3(CLAAMNAMENSP,
    1997             :                                 ObjectIdGetDatum(accessMethodId),
    1998             :                                 PointerGetDatum(opcname),
    1999             :                                 ObjectIdGetDatum(namespaceId));
    2000             :     }
    2001             :     else
    2002             :     {
    2003             :         /* Unqualified opclass name, so search the search path */
    2004       69504 :         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
    2005       69504 :         if (!OidIsValid(opClassId))
    2006           8 :             ereport(ERROR,
    2007             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2008             :                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2009             :                             opcname, accessMethodName)));
    2010       69496 :         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
    2011             :     }
    2012             : 
    2013       69500 :     if (!HeapTupleIsValid(tuple))
    2014           0 :         ereport(ERROR,
    2015             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2016             :                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2017             :                         NameListToString(opclass), accessMethodName)));
    2018             : 
    2019             :     /*
    2020             :      * Verify that the index operator class accepts this datatype.  Note we
    2021             :      * will accept binary compatibility.
    2022             :      */
    2023       69500 :     opform = (Form_pg_opclass) GETSTRUCT(tuple);
    2024       69500 :     opClassId = opform->oid;
    2025       69500 :     opInputType = opform->opcintype;
    2026             : 
    2027       69500 :     if (!IsBinaryCoercible(attrType, opInputType))
    2028           0 :         ereport(ERROR,
    2029             :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
    2030             :                  errmsg("operator class \"%s\" does not accept data type %s",
    2031             :                         NameListToString(opclass), format_type_be(attrType))));
    2032             : 
    2033       69500 :     ReleaseSysCache(tuple);
    2034             : 
    2035       69500 :     return opClassId;
    2036             : }
    2037             : 
    2038             : /*
    2039             :  * GetDefaultOpClass
    2040             :  *
    2041             :  * Given the OIDs of a datatype and an access method, find the default
    2042             :  * operator class, if any.  Returns InvalidOid if there is none.
    2043             :  */
    2044             : Oid
    2045       47590 : GetDefaultOpClass(Oid type_id, Oid am_id)
    2046             : {
    2047       47590 :     Oid         result = InvalidOid;
    2048       47590 :     int         nexact = 0;
    2049       47590 :     int         ncompatible = 0;
    2050       47590 :     int         ncompatiblepreferred = 0;
    2051             :     Relation    rel;
    2052             :     ScanKeyData skey[1];
    2053             :     SysScanDesc scan;
    2054             :     HeapTuple   tup;
    2055             :     TYPCATEGORY tcategory;
    2056             : 
    2057             :     /* If it's a domain, look at the base type instead */
    2058       47590 :     type_id = getBaseType(type_id);
    2059             : 
    2060       47590 :     tcategory = TypeCategory(type_id);
    2061             : 
    2062             :     /*
    2063             :      * We scan through all the opclasses available for the access method,
    2064             :      * looking for one that is marked default and matches the target type
    2065             :      * (either exactly or binary-compatibly, but prefer an exact match).
    2066             :      *
    2067             :      * We could find more than one binary-compatible match.  If just one is
    2068             :      * for a preferred type, use that one; otherwise we fail, forcing the user
    2069             :      * to specify which one he wants.  (The preferred-type special case is a
    2070             :      * kluge for varchar: it's binary-compatible to both text and bpchar, so
    2071             :      * we need a tiebreaker.)  If we find more than one exact match, then
    2072             :      * someone put bogus entries in pg_opclass.
    2073             :      */
    2074       47590 :     rel = table_open(OperatorClassRelationId, AccessShareLock);
    2075             : 
    2076       47590 :     ScanKeyInit(&skey[0],
    2077             :                 Anum_pg_opclass_opcmethod,
    2078             :                 BTEqualStrategyNumber, F_OIDEQ,
    2079             :                 ObjectIdGetDatum(am_id));
    2080             : 
    2081       47590 :     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
    2082             :                               NULL, 1, skey);
    2083             : 
    2084     2033268 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    2085             :     {
    2086     1985678 :         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
    2087             : 
    2088             :         /* ignore altogether if not a default opclass */
    2089     1985678 :         if (!opclass->opcdefault)
    2090      279558 :             continue;
    2091     1706120 :         if (opclass->opcintype == type_id)
    2092             :         {
    2093       38736 :             nexact++;
    2094       38736 :             result = opclass->oid;
    2095             :         }
    2096     2539082 :         else if (nexact == 0 &&
    2097      871698 :                  IsBinaryCoercible(type_id, opclass->opcintype))
    2098             :         {
    2099       14404 :             if (IsPreferredType(tcategory, opclass->opcintype))
    2100             :             {
    2101        2648 :                 ncompatiblepreferred++;
    2102        2648 :                 result = opclass->oid;
    2103             :             }
    2104       11756 :             else if (ncompatiblepreferred == 0)
    2105             :             {
    2106       11756 :                 ncompatible++;
    2107       11756 :                 result = opclass->oid;
    2108             :             }
    2109             :         }
    2110             :     }
    2111             : 
    2112       47590 :     systable_endscan(scan);
    2113             : 
    2114       47590 :     table_close(rel, AccessShareLock);
    2115             : 
    2116             :     /* raise error if pg_opclass contains inconsistent data */
    2117       47590 :     if (nexact > 1)
    2118           0 :         ereport(ERROR,
    2119             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2120             :                  errmsg("there are multiple default operator classes for data type %s",
    2121             :                         format_type_be(type_id))));
    2122             : 
    2123       47590 :     if (nexact == 1 ||
    2124        6208 :         ncompatiblepreferred == 1 ||
    2125        6208 :         (ncompatiblepreferred == 0 && ncompatible == 1))
    2126       46172 :         return result;
    2127             : 
    2128        1418 :     return InvalidOid;
    2129             : }
    2130             : 
    2131             : /*
    2132             :  *  makeObjectName()
    2133             :  *
    2134             :  *  Create a name for an implicitly created index, sequence, constraint,
    2135             :  *  extended statistics, etc.
    2136             :  *
    2137             :  *  The parameters are typically: the original table name, the original field
    2138             :  *  name, and a "type" string (such as "seq" or "pkey").    The field name
    2139             :  *  and/or type can be NULL if not relevant.
    2140             :  *
    2141             :  *  The result is a palloc'd string.
    2142             :  *
    2143             :  *  The basic result we want is "name1_name2_label", omitting "_name2" or
    2144             :  *  "_label" when those parameters are NULL.  However, we must generate
    2145             :  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
    2146             :  *  both names if necessary to make a short-enough string.  The label part
    2147             :  *  is never truncated (so it had better be reasonably short).
    2148             :  *
    2149             :  *  The caller is responsible for checking uniqueness of the generated
    2150             :  *  name and retrying as needed; retrying will be done by altering the
    2151             :  *  "label" string (which is why we never truncate that part).
    2152             :  */
    2153             : char *
    2154        9408 : makeObjectName(const char *name1, const char *name2, const char *label)
    2155             : {
    2156             :     char       *name;
    2157        9408 :     int         overhead = 0;   /* chars needed for label and underscores */
    2158             :     int         availchars;     /* chars available for name(s) */
    2159             :     int         name1chars;     /* chars allocated to name1 */
    2160             :     int         name2chars;     /* chars allocated to name2 */
    2161             :     int         ndx;
    2162             : 
    2163        9408 :     name1chars = strlen(name1);
    2164        9408 :     if (name2)
    2165             :     {
    2166        4400 :         name2chars = strlen(name2);
    2167        4400 :         overhead++;             /* allow for separating underscore */
    2168             :     }
    2169             :     else
    2170        5008 :         name2chars = 0;
    2171        9408 :     if (label)
    2172        9408 :         overhead += strlen(label) + 1;
    2173             : 
    2174        9408 :     availchars = NAMEDATALEN - 1 - overhead;
    2175             :     Assert(availchars > 0);      /* else caller chose a bad label */
    2176             : 
    2177             :     /*
    2178             :      * If we must truncate,  preferentially truncate the longer name. This
    2179             :      * logic could be expressed without a loop, but it's simple and obvious as
    2180             :      * a loop.
    2181             :      */
    2182        9452 :     while (name1chars + name2chars > availchars)
    2183             :     {
    2184          44 :         if (name1chars > name2chars)
    2185           0 :             name1chars--;
    2186             :         else
    2187          44 :             name2chars--;
    2188             :     }
    2189             : 
    2190        9408 :     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
    2191        9408 :     if (name2)
    2192        4400 :         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
    2193             : 
    2194             :     /* Now construct the string using the chosen lengths */
    2195        9408 :     name = palloc(name1chars + name2chars + overhead + 1);
    2196        9408 :     memcpy(name, name1, name1chars);
    2197        9408 :     ndx = name1chars;
    2198        9408 :     if (name2)
    2199             :     {
    2200        4400 :         name[ndx++] = '_';
    2201        4400 :         memcpy(name + ndx, name2, name2chars);
    2202        4400 :         ndx += name2chars;
    2203             :     }
    2204        9408 :     if (label)
    2205             :     {
    2206        9408 :         name[ndx++] = '_';
    2207        9408 :         strcpy(name + ndx, label);
    2208             :     }
    2209             :     else
    2210           0 :         name[ndx] = '\0';
    2211             : 
    2212        9408 :     return name;
    2213             : }
    2214             : 
    2215             : /*
    2216             :  * Select a nonconflicting name for a new relation.  This is ordinarily
    2217             :  * used to choose index names (which is why it's here) but it can also
    2218             :  * be used for sequences, or any autogenerated relation kind.
    2219             :  *
    2220             :  * name1, name2, and label are used the same way as for makeObjectName(),
    2221             :  * except that the label can't be NULL; digits will be appended to the label
    2222             :  * if needed to create a name that is unique within the specified namespace.
    2223             :  *
    2224             :  * If isconstraint is true, we also avoid choosing a name matching any
    2225             :  * existing constraint in the same namespace.  (This is stricter than what
    2226             :  * Postgres itself requires, but the SQL standard says that constraint names
    2227             :  * should be unique within schemas, so we follow that for autogenerated
    2228             :  * constraint names.)
    2229             :  *
    2230             :  * Note: it is theoretically possible to get a collision anyway, if someone
    2231             :  * else chooses the same name concurrently.  This is fairly unlikely to be
    2232             :  * a problem in practice, especially if one is holding an exclusive lock on
    2233             :  * the relation identified by name1.  However, if choosing multiple names
    2234             :  * within a single command, you'd better create the new object and do
    2235             :  * CommandCounterIncrement before choosing the next one!
    2236             :  *
    2237             :  * Returns a palloc'd string.
    2238             :  */
    2239             : char *
    2240        7046 : ChooseRelationName(const char *name1, const char *name2,
    2241             :                    const char *label, Oid namespaceid,
    2242             :                    bool isconstraint)
    2243             : {
    2244        7046 :     int         pass = 0;
    2245        7046 :     char       *relname = NULL;
    2246             :     char        modlabel[NAMEDATALEN];
    2247             : 
    2248             :     /* try the unmodified label first */
    2249        7046 :     StrNCpy(modlabel, label, sizeof(modlabel));
    2250             : 
    2251             :     for (;;)
    2252             :     {
    2253        7170 :         relname = makeObjectName(name1, name2, modlabel);
    2254             : 
    2255        7170 :         if (!OidIsValid(get_relname_relid(relname, namespaceid)))
    2256             :         {
    2257        7050 :             if (!isconstraint ||
    2258        4998 :                 !ConstraintNameExists(relname, namespaceid))
    2259             :                 break;
    2260             :         }
    2261             : 
    2262             :         /* found a conflict, so try a new name component */
    2263         124 :         pfree(relname);
    2264         124 :         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
    2265             :     }
    2266             : 
    2267        7046 :     return relname;
    2268             : }
    2269             : 
    2270             : /*
    2271             :  * Select the name to be used for an index.
    2272             :  *
    2273             :  * The argument list is pretty ad-hoc :-(
    2274             :  */
    2275             : static char *
    2276        6128 : ChooseIndexName(const char *tabname, Oid namespaceId,
    2277             :                 List *colnames, List *exclusionOpNames,
    2278             :                 bool primary, bool isconstraint)
    2279             : {
    2280             :     char       *indexname;
    2281             : 
    2282        6128 :     if (primary)
    2283             :     {
    2284             :         /* the primary key's name does not depend on the specific column(s) */
    2285        4546 :         indexname = ChooseRelationName(tabname,
    2286             :                                        NULL,
    2287             :                                        "pkey",
    2288             :                                        namespaceId,
    2289             :                                        true);
    2290             :     }
    2291        1582 :     else if (exclusionOpNames != NIL)
    2292             :     {
    2293          62 :         indexname = ChooseRelationName(tabname,
    2294          62 :                                        ChooseIndexNameAddition(colnames),
    2295             :                                        "excl",
    2296             :                                        namespaceId,
    2297             :                                        true);
    2298             :     }
    2299        1520 :     else if (isconstraint)
    2300             :     {
    2301         386 :         indexname = ChooseRelationName(tabname,
    2302         386 :                                        ChooseIndexNameAddition(colnames),
    2303             :                                        "key",
    2304             :                                        namespaceId,
    2305             :                                        true);
    2306             :     }
    2307             :     else
    2308             :     {
    2309        1134 :         indexname = ChooseRelationName(tabname,
    2310        1134 :                                        ChooseIndexNameAddition(colnames),
    2311             :                                        "idx",
    2312             :                                        namespaceId,
    2313             :                                        false);
    2314             :     }
    2315             : 
    2316        6128 :     return indexname;
    2317             : }
    2318             : 
    2319             : /*
    2320             :  * Generate "name2" for a new index given the list of column names for it
    2321             :  * (as produced by ChooseIndexColumnNames).  This will be passed to
    2322             :  * ChooseRelationName along with the parent table name and a suitable label.
    2323             :  *
    2324             :  * We know that less than NAMEDATALEN characters will actually be used,
    2325             :  * so we can truncate the result once we've generated that many.
    2326             :  *
    2327             :  * XXX See also ChooseForeignKeyConstraintNameAddition and
    2328             :  * ChooseExtendedStatisticNameAddition.
    2329             :  */
    2330             : static char *
    2331        1582 : ChooseIndexNameAddition(List *colnames)
    2332             : {
    2333             :     char        buf[NAMEDATALEN * 2];
    2334        1582 :     int         buflen = 0;
    2335             :     ListCell   *lc;
    2336             : 
    2337        1582 :     buf[0] = '\0';
    2338        3728 :     foreach(lc, colnames)
    2339             :     {
    2340        2146 :         const char *name = (const char *) lfirst(lc);
    2341             : 
    2342        2146 :         if (buflen > 0)
    2343         564 :             buf[buflen++] = '_';    /* insert _ between names */
    2344             : 
    2345             :         /*
    2346             :          * At this point we have buflen <= NAMEDATALEN.  name should be less
    2347             :          * than NAMEDATALEN already, but use strlcpy for paranoia.
    2348             :          */
    2349        2146 :         strlcpy(buf + buflen, name, NAMEDATALEN);
    2350        2146 :         buflen += strlen(buf + buflen);
    2351        2146 :         if (buflen >= NAMEDATALEN)
    2352           0 :             break;
    2353             :     }
    2354        1582 :     return pstrdup(buf);
    2355             : }
    2356             : 
    2357             : /*
    2358             :  * Select the actual names to be used for the columns of an index, given the
    2359             :  * list of IndexElems for the columns.  This is mostly about ensuring the
    2360             :  * names are unique so we don't get a conflicting-attribute-names error.
    2361             :  *
    2362             :  * Returns a List of plain strings (char *, not String nodes).
    2363             :  */
    2364             : static List *
    2365       50848 : ChooseIndexColumnNames(List *indexElems)
    2366             : {
    2367       50848 :     List       *result = NIL;
    2368             :     ListCell   *lc;
    2369             : 
    2370      131230 :     foreach(lc, indexElems)
    2371             :     {
    2372       80382 :         IndexElem  *ielem = (IndexElem *) lfirst(lc);
    2373             :         const char *origname;
    2374             :         const char *curname;
    2375             :         int         i;
    2376             :         char        buf[NAMEDATALEN];
    2377             : 
    2378             :         /* Get the preliminary name from the IndexElem */
    2379       80382 :         if (ielem->indexcolname)
    2380         996 :             origname = ielem->indexcolname; /* caller-specified name */
    2381       79386 :         else if (ielem->name)
    2382       79200 :             origname = ielem->name; /* simple column reference */
    2383             :         else
    2384         186 :             origname = "expr";    /* default name for expression */
    2385             : 
    2386             :         /* If it conflicts with any previous column, tweak it */
    2387       80382 :         curname = origname;
    2388       80382 :         for (i = 1;; i++)
    2389          40 :         {
    2390             :             ListCell   *lc2;
    2391             :             char        nbuf[32];
    2392             :             int         nlen;
    2393             : 
    2394      124666 :             foreach(lc2, result)
    2395             :             {
    2396       44284 :                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
    2397          40 :                     break;
    2398             :             }
    2399       80422 :             if (lc2 == NULL)
    2400       80382 :                 break;          /* found nonconflicting name */
    2401             : 
    2402          40 :             sprintf(nbuf, "%d", i);
    2403             : 
    2404             :             /* Ensure generated names are shorter than NAMEDATALEN */
    2405          40 :             nlen = pg_mbcliplen(origname, strlen(origname),
    2406          40 :                                 NAMEDATALEN - 1 - strlen(nbuf));
    2407          40 :             memcpy(buf, origname, nlen);
    2408          40 :             strcpy(buf + nlen, nbuf);
    2409          40 :             curname = buf;
    2410             :         }
    2411             : 
    2412             :         /* And attach to the result list */
    2413       80382 :         result = lappend(result, pstrdup(curname));
    2414             :     }
    2415       50848 :     return result;
    2416             : }
    2417             : 
    2418             : /*
    2419             :  * ReindexIndex
    2420             :  *      Recreate a specific index.
    2421             :  */
    2422             : void
    2423          92 : ReindexIndex(RangeVar *indexRelation, int options, bool concurrent)
    2424             : {
    2425             :     struct ReindexIndexCallbackState state;
    2426             :     Oid         indOid;
    2427             :     Relation    irel;
    2428             :     char        persistence;
    2429             : 
    2430             :     /*
    2431             :      * Find and lock index, and check permissions on table; use callback to
    2432             :      * obtain lock on table first, to avoid deadlock hazard.  The lock level
    2433             :      * used here must match the index lock obtained in reindex_index().
    2434             :      *
    2435             :      * If it's a temporary index, we will perform a non-concurrent reindex,
    2436             :      * even if CONCURRENTLY was requested.  In that case, reindex_index() will
    2437             :      * upgrade the lock, but that's OK, because other sessions can't hold
    2438             :      * locks on our temporary table.
    2439             :      */
    2440          92 :     state.concurrent = concurrent;
    2441          92 :     state.locked_table_oid = InvalidOid;
    2442          92 :     indOid = RangeVarGetRelidExtended(indexRelation,
    2443             :                                       concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock,
    2444             :                                       0,
    2445             :                                       RangeVarCallbackForReindexIndex,
    2446             :                                       &state);
    2447             : 
    2448             :     /*
    2449             :      * Obtain the current persistence of the existing index.  We already hold
    2450             :      * lock on the index.
    2451             :      */
    2452          84 :     irel = index_open(indOid, NoLock);
    2453             : 
    2454          84 :     if (irel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
    2455             :     {
    2456           8 :         ReindexPartitionedIndex(irel);
    2457           0 :         return;
    2458             :     }
    2459             : 
    2460          76 :     persistence = irel->rd_rel->relpersistence;
    2461          76 :     index_close(irel, NoLock);
    2462             : 
    2463          76 :     if (concurrent && persistence != RELPERSISTENCE_TEMP)
    2464          34 :         ReindexRelationConcurrently(indOid, options);
    2465             :     else
    2466          42 :         reindex_index(indOid, false, persistence,
    2467             :                       options | REINDEXOPT_REPORT_PROGRESS);
    2468             : }
    2469             : 
    2470             : /*
    2471             :  * Check permissions on table before acquiring relation lock; also lock
    2472             :  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
    2473             :  * deadlocks.
    2474             :  */
    2475             : static void
    2476          94 : RangeVarCallbackForReindexIndex(const RangeVar *relation,
    2477             :                                 Oid relId, Oid oldRelId, void *arg)
    2478             : {
    2479             :     char        relkind;
    2480          94 :     struct ReindexIndexCallbackState *state = arg;
    2481             :     LOCKMODE    table_lockmode;
    2482             : 
    2483             :     /*
    2484             :      * Lock level here should match table lock in reindex_index() for
    2485             :      * non-concurrent case and table locks used by index_concurrently_*() for
    2486             :      * concurrent case.
    2487             :      */
    2488          94 :     table_lockmode = state->concurrent ? ShareUpdateExclusiveLock : ShareLock;
    2489             : 
    2490             :     /*
    2491             :      * If we previously locked some other index's heap, and the name we're
    2492             :      * looking up no longer refers to that relation, release the now-useless
    2493             :      * lock.
    2494             :      */
    2495          94 :     if (relId != oldRelId && OidIsValid(oldRelId))
    2496             :     {
    2497           0 :         UnlockRelationOid(state->locked_table_oid, table_lockmode);
    2498           0 :         state->locked_table_oid = InvalidOid;
    2499             :     }
    2500             : 
    2501             :     /* If the relation does not exist, there's nothing more to do. */
    2502          94 :     if (!OidIsValid(relId))
    2503           4 :         return;
    2504             : 
    2505             :     /*
    2506             :      * If the relation does exist, check whether it's an index.  But note that
    2507             :      * the relation might have been dropped between the time we did the name
    2508             :      * lookup and now.  In that case, there's nothing to do.
    2509             :      */
    2510          90 :     relkind = get_rel_relkind(relId);
    2511          90 :     if (!relkind)
    2512           0 :         return;
    2513          90 :     if (relkind != RELKIND_INDEX &&
    2514             :         relkind != RELKIND_PARTITIONED_INDEX)
    2515           0 :         ereport(ERROR,
    2516             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2517             :                  errmsg("\"%s\" is not an index", relation->relname)));
    2518             : 
    2519             :     /* Check permissions */
    2520          90 :     if (!pg_class_ownercheck(relId, GetUserId()))
    2521           4 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname);
    2522             : 
    2523             :     /* Lock heap before index to avoid deadlock. */
    2524          86 :     if (relId != oldRelId)
    2525             :     {
    2526          84 :         Oid         table_oid = IndexGetRelation(relId, true);
    2527             : 
    2528             :         /*
    2529             :          * If the OID isn't valid, it means the index was concurrently
    2530             :          * dropped, which is not a problem for us; just return normally.
    2531             :          */
    2532          84 :         if (OidIsValid(table_oid))
    2533             :         {
    2534          84 :             LockRelationOid(table_oid, table_lockmode);
    2535          84 :             state->locked_table_oid = table_oid;
    2536             :         }
    2537             :     }
    2538             : }
    2539             : 
    2540             : /*
    2541             :  * ReindexTable
    2542             :  *      Recreate all indexes of a table (and of its toast table, if any)
    2543             :  */
    2544             : Oid
    2545         130 : ReindexTable(RangeVar *relation, int options, bool concurrent)
    2546             : {
    2547             :     Oid         heapOid;
    2548             :     bool        result;
    2549             : 
    2550             :     /*
    2551             :      * The lock level used here should match reindex_relation().
    2552             :      *
    2553             :      * If it's a temporary table, we will perform a non-concurrent reindex,
    2554             :      * even if CONCURRENTLY was requested.  In that case, reindex_relation()
    2555             :      * will upgrade the lock, but that's OK, because other sessions can't hold
    2556             :      * locks on our temporary table.
    2557             :      */
    2558         130 :     heapOid = RangeVarGetRelidExtended(relation,
    2559             :                                        concurrent ? ShareUpdateExclusiveLock : ShareLock,
    2560             :                                        0,
    2561             :                                        RangeVarCallbackOwnsTable, NULL);
    2562             : 
    2563         126 :     if (concurrent && get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
    2564             :     {
    2565          66 :         result = ReindexRelationConcurrently(heapOid, options);
    2566             : 
    2567          70 :         if (!result)
    2568          12 :             ereport(NOTICE,
    2569             :                     (errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
    2570             :                             relation->relname)));
    2571             :     }
    2572             :     else
    2573             :     {
    2574          60 :         result = reindex_relation(heapOid,
    2575             :                                   REINDEX_REL_PROCESS_TOAST |
    2576             :                                   REINDEX_REL_CHECK_CONSTRAINTS,
    2577             :                                   options | REINDEXOPT_REPORT_PROGRESS);
    2578          56 :         if (!result)
    2579           8 :             ereport(NOTICE,
    2580             :                     (errmsg("table \"%s\" has no indexes to reindex",
    2581             :                             relation->relname)));
    2582             :     }
    2583             : 
    2584         114 :     return heapOid;
    2585             : }
    2586             : 
    2587             : /*
    2588             :  * ReindexMultipleTables
    2589             :  *      Recreate indexes of tables selected by objectName/objectKind.
    2590             :  *
    2591             :  * To reduce the probability of deadlocks, each table is reindexed in a
    2592             :  * separate transaction, so we can release the lock on it right away.
    2593             :  * That means this must not be called within a user transaction block!
    2594             :  */
    2595             : void
    2596          68 : ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
    2597             :                       int options, bool concurrent)
    2598             : {
    2599             :     Oid         objectOid;
    2600             :     Relation    relationRelation;
    2601             :     TableScanDesc scan;
    2602             :     ScanKeyData scan_keys[1];
    2603             :     HeapTuple   tuple;
    2604             :     MemoryContext private_context;
    2605             :     MemoryContext old;
    2606          68 :     List       *relids = NIL;
    2607             :     ListCell   *l;
    2608             :     int         num_keys;
    2609          68 :     bool        concurrent_warning = false;
    2610             : 
    2611             :     AssertArg(objectName);
    2612             :     Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
    2613             :            objectKind == REINDEX_OBJECT_SYSTEM ||
    2614             :            objectKind == REINDEX_OBJECT_DATABASE);
    2615             : 
    2616          68 :     if (objectKind == REINDEX_OBJECT_SYSTEM && concurrent)
    2617           6 :         ereport(ERROR,
    2618             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2619             :                  errmsg("cannot reindex system catalogs concurrently")));
    2620             : 
    2621             :     /*
    2622             :      * Get OID of object to reindex, being the database currently being used
    2623             :      * by session for a database or for system catalogs, or the schema defined
    2624             :      * by caller. At the same time do permission checks that need different
    2625             :      * processing depending on the object type.
    2626             :      */
    2627          62 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    2628             :     {
    2629          32 :         objectOid = get_namespace_oid(objectName, false);
    2630             : 
    2631          28 :         if (!pg_namespace_ownercheck(objectOid, GetUserId()))
    2632           4 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
    2633             :                            objectName);
    2634             :     }
    2635             :     else
    2636             :     {
    2637          30 :         objectOid = MyDatabaseId;
    2638             : 
    2639          30 :         if (strcmp(objectName, get_database_name(objectOid)) != 0)
    2640           0 :             ereport(ERROR,
    2641             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2642             :                      errmsg("can only reindex the currently open database")));
    2643          30 :         if (!pg_database_ownercheck(objectOid, GetUserId()))
    2644           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2645             :                            objectName);
    2646             :     }
    2647             : 
    2648             :     /*
    2649             :      * Create a memory context that will survive forced transaction commits we
    2650             :      * do below.  Since it is a child of PortalContext, it will go away
    2651             :      * eventually even if we suffer an error; there's no need for special
    2652             :      * abort cleanup logic.
    2653             :      */
    2654          54 :     private_context = AllocSetContextCreate(PortalContext,
    2655             :                                             "ReindexMultipleTables",
    2656             :                                             ALLOCSET_SMALL_SIZES);
    2657             : 
    2658             :     /*
    2659             :      * Define the search keys to find the objects to reindex. For a schema, we
    2660             :      * select target relations using relnamespace, something not necessary for
    2661             :      * a database-wide operation.
    2662             :      */
    2663          54 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    2664             :     {
    2665          24 :         num_keys = 1;
    2666          24 :         ScanKeyInit(&scan_keys[0],
    2667             :                     Anum_pg_class_relnamespace,
    2668             :                     BTEqualStrategyNumber, F_OIDEQ,
    2669             :                     ObjectIdGetDatum(objectOid));
    2670             :     }
    2671             :     else
    2672          30 :         num_keys = 0;
    2673             : 
    2674             :     /*
    2675             :      * Scan pg_class to build a list of the relations we need to reindex.
    2676             :      *
    2677             :      * We only consider plain relations and materialized views here (toast
    2678             :      * rels will be processed indirectly by reindex_relation).
    2679             :      */
    2680          54 :     relationRelation = table_open(RelationRelationId, AccessShareLock);
    2681          54 :     scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
    2682       13308 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2683             :     {
    2684       13254 :         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
    2685       13254 :         Oid         relid = classtuple->oid;
    2686             : 
    2687             :         /*
    2688             :          * Only regular tables and matviews can have indexes, so ignore any
    2689             :          * other kind of relation.
    2690             :          *
    2691             :          * It is tempting to also consider partitioned tables here, but that
    2692             :          * has the problem that if the children are in the same schema, they
    2693             :          * would be processed twice.  Maybe we could have a separate list of
    2694             :          * partitioned tables, and expand that afterwards into relids,
    2695             :          * ignoring any duplicates.
    2696             :          */
    2697       13254 :         if (classtuple->relkind != RELKIND_RELATION &&
    2698       10826 :             classtuple->relkind != RELKIND_MATVIEW)
    2699       10814 :             continue;
    2700             : 
    2701             :         /* Skip temp tables of other backends; we can't reindex them at all */
    2702        2440 :         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
    2703          20 :             !isTempNamespace(classtuple->relnamespace))
    2704           0 :             continue;
    2705             : 
    2706             :         /* Check user/system classification, and optionally skip */
    2707        2440 :         if (objectKind == REINDEX_OBJECT_SYSTEM &&
    2708         404 :             !IsSystemClass(relid, classtuple))
    2709          32 :             continue;
    2710             : 
    2711             :         /*
    2712             :          * The table can be reindexed if the user is superuser, the table
    2713             :          * owner, or the database/schema owner (but in the latter case, only
    2714             :          * if it's not a shared relation).  pg_class_ownercheck includes the
    2715             :          * superuser case, and depending on objectKind we already know that
    2716             :          * the user has permission to run REINDEX on this database or schema
    2717             :          * per the permission checks at the beginning of this routine.
    2718             :          */
    2719        2408 :         if (classtuple->relisshared &&
    2720         360 :             !pg_class_ownercheck(relid, GetUserId()))
    2721           0 :             continue;
    2722             : 
    2723             :         /*
    2724             :          * Skip system tables, since index_create() would reject indexing them
    2725             :          * concurrently (and it would likely fail if we tried).
    2726             :          */
    2727        2832 :         if (concurrent &&
    2728         424 :             IsCatalogRelationOid(relid))
    2729             :         {
    2730         372 :             if (!concurrent_warning)
    2731           6 :                 ereport(WARNING,
    2732             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2733             :                          errmsg("cannot reindex system catalogs concurrently, skipping all")));
    2734         372 :             concurrent_warning = true;
    2735         372 :             continue;
    2736             :         }
    2737             : 
    2738             :         /* Save the list of relation OIDs in private context */
    2739        2036 :         old = MemoryContextSwitchTo(private_context);
    2740             : 
    2741             :         /*
    2742             :          * We always want to reindex pg_class first if it's selected to be
    2743             :          * reindexed.  This ensures that if there is any corruption in
    2744             :          * pg_class' indexes, they will be fixed before we process any other
    2745             :          * tables.  This is critical because reindexing itself will try to
    2746             :          * update pg_class.
    2747             :          */
    2748        2036 :         if (relid == RelationRelationId)
    2749          30 :             relids = lcons_oid(relid, relids);
    2750             :         else
    2751        2006 :             relids = lappend_oid(relids, relid);
    2752             : 
    2753        2036 :         MemoryContextSwitchTo(old);
    2754             :     }
    2755          54 :     table_endscan(scan);
    2756          54 :     table_close(relationRelation, AccessShareLock);
    2757             : 
    2758             :     /* Now reindex each rel in a separate transaction */
    2759          54 :     PopActiveSnapshot();
    2760          54 :     CommitTransactionCommand();
    2761        2090 :     foreach(l, relids)
    2762             :     {
    2763        2036 :         Oid         relid = lfirst_oid(l);
    2764             : 
    2765        2036 :         StartTransactionCommand();
    2766             :         /* functions in indexes may want a snapshot set */
    2767        2036 :         PushActiveSnapshot(GetTransactionSnapshot());
    2768             : 
    2769        2036 :         if (concurrent && get_rel_persistence(relid) != RELPERSISTENCE_TEMP)
    2770             :         {
    2771          32 :             (void) ReindexRelationConcurrently(relid, options);
    2772             :             /* ReindexRelationConcurrently() does the verbose output */
    2773             :         }
    2774             :         else
    2775             :         {
    2776             :             bool        result;
    2777             : 
    2778        2004 :             result = reindex_relation(relid,
    2779             :                                       REINDEX_REL_PROCESS_TOAST |
    2780             :                                       REINDEX_REL_CHECK_CONSTRAINTS,
    2781             :                                       options | REINDEXOPT_REPORT_PROGRESS);
    2782             : 
    2783        2004 :             if (result && (options & REINDEXOPT_VERBOSE))
    2784           0 :                 ereport(INFO,
    2785             :                         (errmsg("table \"%s.%s\" was reindexed",
    2786             :                                 get_namespace_name(get_rel_namespace(relid)),
    2787             :                                 get_rel_name(relid))));
    2788             : 
    2789        2004 :             PopActiveSnapshot();
    2790             :         }
    2791             : 
    2792        2036 :         CommitTransactionCommand();
    2793             :     }
    2794          54 :     StartTransactionCommand();
    2795             : 
    2796          54 :     MemoryContextDelete(private_context);
    2797          54 : }
    2798             : 
    2799             : 
    2800             : /*
    2801             :  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
    2802             :  * relation OID
    2803             :  *
    2804             :  * 'relationOid' can either belong to an index, a table or a materialized
    2805             :  * view.  For tables and materialized views, all its indexes will be rebuilt,
    2806             :  * excluding invalid indexes and any indexes used in exclusion constraints,
    2807             :  * but including its associated toast table indexes.  For indexes, the index
    2808             :  * itself will be rebuilt.  If 'relationOid' belongs to a partitioned table
    2809             :  * then we issue a warning to mention these are not yet supported.
    2810             :  *
    2811             :  * The locks taken on parent tables and involved indexes are kept until the
    2812             :  * transaction is committed, at which point a session lock is taken on each
    2813             :  * relation.  Both of these protect against concurrent schema changes.
    2814             :  *
    2815             :  * Returns true if any indexes have been rebuilt (including toast table's
    2816             :  * indexes, when relevant), otherwise returns false.
    2817             :  *
    2818             :  * NOTE: This cannot be used on temporary relations.  A concurrent build would
    2819             :  * cause issues with ON COMMIT actions triggered by the transactions of the
    2820             :  * concurrent build.  Temporary relations are not subject to concurrent
    2821             :  * concerns, so there's no need for the more complicated concurrent build,
    2822             :  * anyway, and a non-concurrent reindex is more efficient.
    2823             :  */
    2824             : static bool
    2825         132 : ReindexRelationConcurrently(Oid relationOid, int options)
    2826             : {
    2827         132 :     List       *heapRelationIds = NIL;
    2828         132 :     List       *indexIds = NIL;
    2829         132 :     List       *newIndexIds = NIL;
    2830         132 :     List       *relationLocks = NIL;
    2831         132 :     List       *lockTags = NIL;
    2832             :     ListCell   *lc,
    2833             :                *lc2;
    2834             :     MemoryContext private_context;
    2835             :     MemoryContext oldcontext;
    2836             :     char        relkind;
    2837         132 :     char       *relationName = NULL;
    2838         132 :     char       *relationNamespace = NULL;
    2839             :     PGRUsage    ru0;
    2840             : 
    2841             :     /*
    2842             :      * Create a memory context that will survive forced transaction commits we
    2843             :      * do below.  Since it is a child of PortalContext, it will go away
    2844             :      * eventually even if we suffer an error; there's no need for special
    2845             :      * abort cleanup logic.
    2846             :      */
    2847         132 :     private_context = AllocSetContextCreate(PortalContext,
    2848             :                                             "ReindexConcurrent",
    2849             :                                             ALLOCSET_SMALL_SIZES);
    2850             : 
    2851         132 :     if (options & REINDEXOPT_VERBOSE)
    2852             :     {
    2853             :         /* Save data needed by REINDEX VERBOSE in private context */
    2854           0 :         oldcontext = MemoryContextSwitchTo(private_context);
    2855             : 
    2856           0 :         relationName = get_rel_name(relationOid);
    2857           0 :         relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
    2858             : 
    2859           0 :         pg_rusage_init(&ru0);
    2860             : 
    2861           0 :         MemoryContextSwitchTo(oldcontext);
    2862             :     }
    2863             : 
    2864         132 :     relkind = get_rel_relkind(relationOid);
    2865             : 
    2866             :     /*
    2867             :      * Extract the list of indexes that are going to be rebuilt based on the
    2868             :      * relation Oid given by caller.
    2869             :      */
    2870         132 :     switch (relkind)
    2871             :     {
    2872          94 :         case RELKIND_RELATION:
    2873             :         case RELKIND_MATVIEW:
    2874             :         case RELKIND_TOASTVALUE:
    2875             :             {
    2876             :                 /*
    2877             :                  * In the case of a relation, find all its indexes including
    2878             :                  * toast indexes.
    2879             :                  */
    2880             :                 Relation    heapRelation;
    2881             : 
    2882             :                 /* Save the list of relation OIDs in private context */
    2883          94 :                 oldcontext = MemoryContextSwitchTo(private_context);
    2884             : 
    2885             :                 /* Track this relation for session locks */
    2886          94 :                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
    2887             : 
    2888          94 :                 MemoryContextSwitchTo(oldcontext);
    2889             : 
    2890          94 :                 if (IsCatalogRelationOid(relationOid))
    2891           8 :                     ereport(ERROR,
    2892             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2893             :                              errmsg("cannot reindex system catalogs concurrently")));
    2894             : 
    2895             :                 /* Open relation to get its indexes */
    2896          86 :                 heapRelation = table_open(relationOid, ShareUpdateExclusiveLock);
    2897             : 
    2898             :                 /* Add all the valid indexes of relation to list */
    2899         172 :                 foreach(lc, RelationGetIndexList(heapRelation))
    2900             :                 {
    2901          86 :                     Oid         cellOid = lfirst_oid(lc);
    2902          86 :                     Relation    indexRelation = index_open(cellOid,
    2903             :                                                            ShareUpdateExclusiveLock);
    2904             : 
    2905          86 :                     if (!indexRelation->rd_index->indisvalid)
    2906           4 :                         ereport(WARNING,
    2907             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2908             :                                  errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
    2909             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    2910             :                                         get_rel_name(cellOid))));
    2911          82 :                     else if (indexRelation->rd_index->indisexclusion)
    2912           4 :                         ereport(WARNING,
    2913             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2914             :                                  errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
    2915             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    2916             :                                         get_rel_name(cellOid))));
    2917             :                     else
    2918             :                     {
    2919             :                         /* Save the list of relation OIDs in private context */
    2920          78 :                         oldcontext = MemoryContextSwitchTo(private_context);
    2921             : 
    2922          78 :                         indexIds = lappend_oid(indexIds, cellOid);
    2923             : 
    2924          78 :                         MemoryContextSwitchTo(oldcontext);
    2925             :                     }
    2926             : 
    2927          86 :                     index_close(indexRelation, NoLock);
    2928             :                 }
    2929             : 
    2930             :                 /* Also add the toast indexes */
    2931          86 :                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
    2932             :                 {
    2933          36 :                     Oid         toastOid = heapRelation->rd_rel->reltoastrelid;
    2934          36 :                     Relation    toastRelation = table_open(toastOid,
    2935             :                                                            ShareUpdateExclusiveLock);
    2936             : 
    2937             :                     /* Save the list of relation OIDs in private context */
    2938          36 :                     oldcontext = MemoryContextSwitchTo(private_context);
    2939             : 
    2940             :                     /* Track this relation for session locks */
    2941          36 :                     heapRelationIds = lappend_oid(heapRelationIds, toastOid);
    2942             : 
    2943          36 :                     MemoryContextSwitchTo(oldcontext);
    2944             : 
    2945          72 :                     foreach(lc2, RelationGetIndexList(toastRelation))
    2946             :                     {
    2947          36 :                         Oid         cellOid = lfirst_oid(lc2);
    2948          36 :                         Relation    indexRelation = index_open(cellOid,
    2949             :                                                                ShareUpdateExclusiveLock);
    2950             : 
    2951          36 :                         if (!indexRelation->rd_index->indisvalid)
    2952           0 :                             ereport(WARNING,
    2953             :                                     (errcode(ERRCODE_INDEX_CORRUPTED),
    2954             :                                      errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
    2955             :                                             get_namespace_name(get_rel_namespace(cellOid)),
    2956             :                                             get_rel_name(cellOid))));
    2957             :                         else
    2958             :                         {
    2959             :                             /*
    2960             :                              * Save the list of relation OIDs in private
    2961             :                              * context
    2962             :                              */
    2963          36 :                             oldcontext = MemoryContextSwitchTo(private_context);
    2964             : 
    2965          36 :                             indexIds = lappend_oid(indexIds, cellOid);
    2966             : 
    2967          36 :                             MemoryContextSwitchTo(oldcontext);
    2968             :                         }
    2969             : 
    2970          36 :                         index_close(indexRelation, NoLock);
    2971             :                     }
    2972             : 
    2973          36 :                     table_close(toastRelation, NoLock);
    2974             :                 }
    2975             : 
    2976          86 :                 table_close(heapRelation, NoLock);
    2977          86 :                 break;
    2978             :             }
    2979          34 :         case RELKIND_INDEX:
    2980             :             {
    2981          34 :                 Oid         heapId = IndexGetRelation(relationOid, false);
    2982             : 
    2983          34 :                 if (IsCatalogRelationOid(heapId))
    2984           8 :                     ereport(ERROR,
    2985             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2986             :                              errmsg("cannot reindex system catalogs concurrently")));
    2987             : 
    2988             :                 /*
    2989             :                  * Don't allow reindex for an invalid index on TOAST table, as
    2990             :                  * if rebuilt it would not be possible to drop it.
    2991             :                  */
    2992          26 :                 if (IsToastNamespace(get_rel_namespace(relationOid)) &&
    2993           0 :                     !get_index_isvalid(relationOid))
    2994           0 :                     ereport(ERROR,
    2995             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2996             :                              errmsg("cannot reindex invalid index on TOAST table concurrently")));
    2997             : 
    2998             :                 /* Save the list of relation OIDs in private context */
    2999          26 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3000             : 
    3001             :                 /* Track the heap relation of this index for session locks */
    3002          26 :                 heapRelationIds = list_make1_oid(heapId);
    3003             : 
    3004             :                 /*
    3005             :                  * Save the list of relation OIDs in private context.  Note
    3006             :                  * that invalid indexes are allowed here.
    3007             :                  */
    3008          26 :                 indexIds = lappend_oid(indexIds, relationOid);
    3009             : 
    3010          26 :                 MemoryContextSwitchTo(oldcontext);
    3011          26 :                 break;
    3012             :             }
    3013           4 :         case RELKIND_PARTITIONED_TABLE:
    3014             :             /* see reindex_relation() */
    3015           4 :             ereport(WARNING,
    3016             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3017             :                      errmsg("REINDEX of partitioned tables is not yet implemented, skipping \"%s\"",
    3018             :                             get_rel_name(relationOid))));
    3019           4 :             return false;
    3020           0 :         default:
    3021             :             /* Return error if type of relation is not supported */
    3022           0 :             ereport(ERROR,
    3023             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3024             :                      errmsg("cannot reindex this type of relation concurrently")));
    3025             :             break;
    3026             :     }
    3027             : 
    3028             :     /* Definitely no indexes, so leave */
    3029         112 :     if (indexIds == NIL)
    3030             :     {
    3031          16 :         PopActiveSnapshot();
    3032          16 :         return false;
    3033             :     }
    3034             : 
    3035             :     Assert(heapRelationIds != NIL);
    3036             : 
    3037             :     /*-----
    3038             :      * Now we have all the indexes we want to process in indexIds.
    3039             :      *
    3040             :      * The phases now are:
    3041             :      *
    3042             :      * 1. create new indexes in the catalog
    3043             :      * 2. build new indexes
    3044             :      * 3. let new indexes catch up with tuples inserted in the meantime
    3045             :      * 4. swap index names
    3046             :      * 5. mark old indexes as dead
    3047             :      * 6. drop old indexes
    3048             :      *
    3049             :      * We process each phase for all indexes before moving to the next phase,
    3050             :      * for efficiency.
    3051             :      */
    3052             : 
    3053             :     /*
    3054             :      * Phase 1 of REINDEX CONCURRENTLY
    3055             :      *
    3056             :      * Create a new index with the same properties as the old one, but it is
    3057             :      * only registered in catalogs and will be built later.  Then get session
    3058             :      * locks on all involved tables.  See analogous code in DefineIndex() for
    3059             :      * more detailed comments.
    3060             :      */
    3061             : 
    3062         232 :     foreach(lc, indexIds)
    3063             :     {
    3064             :         char       *concurrentName;
    3065         140 :         Oid         indexId = lfirst_oid(lc);
    3066             :         Oid         newIndexId;
    3067             :         Relation    indexRel;
    3068             :         Relation    heapRel;
    3069             :         Relation    newIndexRel;
    3070             :         LockRelId  *lockrelid;
    3071             : 
    3072         140 :         indexRel = index_open(indexId, ShareUpdateExclusiveLock);
    3073         140 :         heapRel = table_open(indexRel->rd_index->indrelid,
    3074             :                              ShareUpdateExclusiveLock);
    3075             : 
    3076             :         /* This function shouldn't be called for temporary relations. */
    3077         140 :         if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
    3078           0 :             elog(ERROR, "cannot reindex a temporary table concurrently");
    3079             : 
    3080         140 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
    3081             :                                       RelationGetRelid(heapRel));
    3082         140 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
    3083             :                                      PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY);
    3084         140 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
    3085             :                                      indexId);
    3086         140 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
    3087         140 :                                      indexRel->rd_rel->relam);
    3088             : 
    3089             :         /* Choose a temporary relation name for the new index */
    3090         140 :         concurrentName = ChooseRelationName(get_rel_name(indexId),
    3091             :                                             NULL,
    3092             :                                             "ccnew",
    3093         140 :                                             get_rel_namespace(indexRel->rd_index->indrelid),
    3094             :                                             false);
    3095             : 
    3096             :         /* Create new index definition based on given index */
    3097         140 :         newIndexId = index_concurrently_create_copy(heapRel,
    3098             :                                                     indexId,
    3099             :                                                     concurrentName);
    3100             : 
    3101             :         /*
    3102             :          * Now open the relation of the new index, a session-level lock is
    3103             :          * also needed on it.
    3104             :          */
    3105         136 :         newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
    3106             : 
    3107             :         /*
    3108             :          * Save the list of OIDs and locks in private context
    3109             :          */
    3110         136 :         oldcontext = MemoryContextSwitchTo(private_context);
    3111             : 
    3112         136 :         newIndexIds = lappend_oid(newIndexIds, newIndexId);
    3113             : 
    3114             :         /*
    3115             :          * Save lockrelid to protect each relation from drop then close
    3116             :          * relations. The lockrelid on parent relation is not taken here to
    3117             :          * avoid multiple locks taken on the same relation, instead we rely on
    3118             :          * parentRelationIds built earlier.
    3119             :          */
    3120         136 :         lockrelid = palloc(sizeof(*lockrelid));
    3121         136 :         *lockrelid = indexRel->rd_lockInfo.lockRelId;
    3122         136 :         relationLocks = lappend(relationLocks, lockrelid);
    3123         136 :         lockrelid = palloc(sizeof(*lockrelid));
    3124         136 :         *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
    3125         136 :         relationLocks = lappend(relationLocks, lockrelid);
    3126             : 
    3127         136 :         MemoryContextSwitchTo(oldcontext);
    3128             : 
    3129         136 :         index_close(indexRel, NoLock);
    3130         136 :         index_close(newIndexRel, NoLock);
    3131         136 :         table_close(heapRel, NoLock);
    3132             :     }
    3133             : 
    3134             :     /*
    3135             :      * Save the heap lock for following visibility checks with other backends
    3136             :      * might conflict with this session.
    3137             :      */
    3138         220 :     foreach(lc, heapRelationIds)
    3139             :     {
    3140         128 :         Relation    heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
    3141             :         LockRelId  *lockrelid;
    3142             :         LOCKTAG    *heaplocktag;
    3143             : 
    3144             :         /* Save the list of locks in private context */
    3145         128 :         oldcontext = MemoryContextSwitchTo(private_context);
    3146             : 
    3147             :         /* Add lockrelid of heap relation to the list of locked relations */
    3148         128 :         lockrelid = palloc(sizeof(*lockrelid));
    3149         128 :         *lockrelid = heapRelation->rd_lockInfo.lockRelId;
    3150         128 :         relationLocks = lappend(relationLocks, lockrelid);
    3151             : 
    3152         128 :         heaplocktag = (LOCKTAG *) palloc(sizeof(LOCKTAG));
    3153             : 
    3154             :         /* Save the LOCKTAG for this parent relation for the wait phase */
    3155         128 :         SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
    3156         128 :         lockTags = lappend(lockTags, heaplocktag);
    3157             : 
    3158         128 :         MemoryContextSwitchTo(oldcontext);
    3159             : 
    3160             :         /* Close heap relation */
    3161         128 :         table_close(heapRelation, NoLock);
    3162             :     }
    3163             : 
    3164             :     /* Get a session-level lock on each table. */
    3165         492 :     foreach(lc, relationLocks)
    3166             :     {
    3167         400 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    3168             : 
    3169         400 :         LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    3170             :     }
    3171             : 
    3172          92 :     PopActiveSnapshot();
    3173          92 :     CommitTransactionCommand();
    3174          92 :     StartTransactionCommand();
    3175             : 
    3176             :     /*
    3177             :      * Phase 2 of REINDEX CONCURRENTLY
    3178             :      *
    3179             :      * Build the new indexes in a separate transaction for each index to avoid
    3180             :      * having open transactions for an unnecessary long time.  But before
    3181             :      * doing that, wait until no running transactions could have the table of
    3182             :      * the index open with the old list of indexes.  See "phase 2" in
    3183             :      * DefineIndex() for more details.
    3184             :      */
    3185             : 
    3186          92 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3187             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    3188          92 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    3189          92 :     CommitTransactionCommand();
    3190             : 
    3191         224 :     forboth(lc, indexIds, lc2, newIndexIds)
    3192             :     {
    3193             :         Relation    indexRel;
    3194         136 :         Oid         oldIndexId = lfirst_oid(lc);
    3195         136 :         Oid         newIndexId = lfirst_oid(lc2);
    3196             :         Oid         heapId;
    3197             : 
    3198             :         /* Start new transaction for this index's concurrent build */
    3199         136 :         StartTransactionCommand();
    3200             : 
    3201             :         /*
    3202             :          * Check for user-requested abort.  This is inside a transaction so as
    3203             :          * xact.c does not issue a useless WARNING, and ensures that
    3204             :          * session-level locks are cleaned up on abort.
    3205             :          */
    3206         136 :         CHECK_FOR_INTERRUPTS();
    3207             : 
    3208             :         /* Set ActiveSnapshot since functions in the indexes may need it */
    3209         136 :         PushActiveSnapshot(GetTransactionSnapshot());
    3210             : 
    3211             :         /*
    3212             :          * Index relation has been closed by previous commit, so reopen it to
    3213             :          * get its information.
    3214             :          */
    3215         136 :         indexRel = index_open(oldIndexId, ShareUpdateExclusiveLock);
    3216         136 :         heapId = indexRel->rd_index->indrelid;
    3217         136 :         index_close(indexRel, NoLock);
    3218             : 
    3219             :         /* Perform concurrent build of new index */
    3220         136 :         index_concurrently_build(heapId, newIndexId);
    3221             : 
    3222         132 :         PopActiveSnapshot();
    3223         132 :         CommitTransactionCommand();
    3224             :     }
    3225          88 :     StartTransactionCommand();
    3226             : 
    3227             :     /*
    3228             :      * Phase 3 of REINDEX CONCURRENTLY
    3229             :      *
    3230             :      * During this phase the old indexes catch up with any new tuples that
    3231             :      * were created during the previous phase.  See "phase 3" in DefineIndex()
    3232             :      * for more details.
    3233             :      */
    3234             : 
    3235          88 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3236             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    3237          88 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    3238          88 :     CommitTransactionCommand();
    3239             : 
    3240         220 :     foreach(lc, newIndexIds)
    3241             :     {
    3242         132 :         Oid         newIndexId = lfirst_oid(lc);
    3243             :         Oid         heapId;
    3244             :         TransactionId limitXmin;
    3245             :         Snapshot    snapshot;
    3246             : 
    3247         132 :         StartTransactionCommand();
    3248             : 
    3249             :         /*
    3250             :          * Check for user-requested abort.  This is inside a transaction so as
    3251             :          * xact.c does not issue a useless WARNING, and ensures that
    3252             :          * session-level locks are cleaned up on abort.
    3253             :          */
    3254         132 :         CHECK_FOR_INTERRUPTS();
    3255             : 
    3256         132 :         heapId = IndexGetRelation(newIndexId, false);
    3257             : 
    3258             :         /*
    3259             :          * Take the "reference snapshot" that will be used by validate_index()
    3260             :          * to filter candidate tuples.
    3261             :          */
    3262         132 :         snapshot = RegisterSnapshot(GetTransactionSnapshot());
    3263         132 :         PushActiveSnapshot(snapshot);
    3264             : 
    3265         132 :         validate_index(heapId, newIndexId, snapshot);
    3266             : 
    3267             :         /*
    3268             :          * We can now do away with our active snapshot, we still need to save
    3269             :          * the xmin limit to wait for older snapshots.
    3270             :          */
    3271         132 :         limitXmin = snapshot->xmin;
    3272             : 
    3273         132 :         PopActiveSnapshot();
    3274         132 :         UnregisterSnapshot(snapshot);
    3275             : 
    3276             :         /*
    3277             :          * To ensure no deadlocks, we must commit and start yet another
    3278             :          * transaction, and do our wait before any snapshot has been taken in
    3279             :          * it.
    3280             :          */
    3281         132 :         CommitTransactionCommand();
    3282         132 :         StartTransactionCommand();
    3283             : 
    3284             :         /*
    3285             :          * The index is now valid in the sense that it contains all currently
    3286             :          * interesting tuples.  But since it might not contain tuples deleted
    3287             :          * just before the reference snap was taken, we have to wait out any
    3288             :          * transactions that might have older snapshots.
    3289             :          */
    3290         132 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3291             :                                      PROGRESS_CREATEIDX_PHASE_WAIT_3);
    3292         132 :         WaitForOlderSnapshots(limitXmin, true);
    3293             : 
    3294         132 :         CommitTransactionCommand();
    3295             :     }
    3296             : 
    3297             :     /*
    3298             :      * Phase 4 of REINDEX CONCURRENTLY
    3299             :      *
    3300             :      * Now that the new indexes have been validated, swap each new index with
    3301             :      * its corresponding old index.
    3302             :      *
    3303             :      * We mark the new indexes as valid and the old indexes as not valid at
    3304             :      * the same time to make sure we only get constraint violations from the
    3305             :      * indexes with the correct names.
    3306             :      */
    3307             : 
    3308          88 :     StartTransactionCommand();
    3309             : 
    3310         220 :     forboth(lc, indexIds, lc2, newIndexIds)
    3311             :     {
    3312             :         char       *oldName;
    3313         132 :         Oid         oldIndexId = lfirst_oid(lc);
    3314         132 :         Oid         newIndexId = lfirst_oid(lc2);
    3315             :         Oid         heapId;
    3316             : 
    3317             :         /*
    3318             :          * Check for user-requested abort.  This is inside a transaction so as
    3319             :          * xact.c does not issue a useless WARNING, and ensures that
    3320             :          * session-level locks are cleaned up on abort.
    3321             :          */
    3322         132 :         CHECK_FOR_INTERRUPTS();
    3323             : 
    3324         132 :         heapId = IndexGetRelation(oldIndexId, false);
    3325             : 
    3326             :         /* Choose a relation name for old index */
    3327         132 :         oldName = ChooseRelationName(get_rel_name(oldIndexId),
    3328             :                                      NULL,
    3329             :                                      "ccold",
    3330             :                                      get_rel_namespace(heapId),
    3331             :                                      false);
    3332             : 
    3333             :         /*
    3334             :          * Swap old index with the new one.  This also marks the new one as
    3335             :          * valid and the old one as not valid.
    3336             :          */
    3337         132 :         index_concurrently_swap(newIndexId, oldIndexId, oldName);
    3338             : 
    3339             :         /*
    3340             :          * Invalidate the relcache for the table, so that after this commit
    3341             :          * all sessions will refresh any cached plans that might reference the
    3342             :          * index.
    3343             :          */
    3344         132 :         CacheInvalidateRelcacheByRelid(heapId);
    3345             : 
    3346             :         /*
    3347             :          * CCI here so that subsequent iterations see the oldName in the
    3348             :          * catalog and can choose a nonconflicting name for their oldName.
    3349             :          * Otherwise, this could lead to conflicts if a table has two indexes
    3350             :          * whose names are equal for the first NAMEDATALEN-minus-a-few
    3351             :          * characters.
    3352             :          */
    3353         132 :         CommandCounterIncrement();
    3354             :     }
    3355             : 
    3356             :     /* Commit this transaction and make index swaps visible */
    3357          88 :     CommitTransactionCommand();
    3358          88 :     StartTransactionCommand();
    3359             : 
    3360             :     /*
    3361             :      * Phase 5 of REINDEX CONCURRENTLY
    3362             :      *
    3363             :      * Mark the old indexes as dead.  First we must wait until no running
    3364             :      * transaction could be using the index for a query.  See also
    3365             :      * index_drop() for more details.
    3366             :      */
    3367             : 
    3368          88 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3369             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    3370          88 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    3371             : 
    3372         220 :     foreach(lc, indexIds)
    3373             :     {
    3374         132 :         Oid         oldIndexId = lfirst_oid(lc);
    3375             :         Oid         heapId;
    3376             : 
    3377             :         /*
    3378             :          * Check for user-requested abort.  This is inside a transaction so as
    3379             :          * xact.c does not issue a useless WARNING, and ensures that
    3380             :          * session-level locks are cleaned up on abort.
    3381             :          */
    3382         132 :         CHECK_FOR_INTERRUPTS();
    3383             : 
    3384         132 :         heapId = IndexGetRelation(oldIndexId, false);
    3385         132 :         index_concurrently_set_dead(heapId, oldIndexId);
    3386             :     }
    3387             : 
    3388             :     /* Commit this transaction to make the updates visible. */
    3389          88 :     CommitTransactionCommand();
    3390          88 :     StartTransactionCommand();
    3391             : 
    3392             :     /*
    3393             :      * Phase 6 of REINDEX CONCURRENTLY
    3394             :      *
    3395             :      * Drop the old indexes.
    3396             :      */
    3397             : 
    3398          88 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3399             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    3400          88 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    3401             : 
    3402          88 :     PushActiveSnapshot(GetTransactionSnapshot());
    3403             : 
    3404             :     {
    3405          88 :         ObjectAddresses *objects = new_object_addresses();
    3406             : 
    3407         220 :         foreach(lc, indexIds)
    3408             :         {
    3409         132 :             Oid         oldIndexId = lfirst_oid(lc);
    3410             :             ObjectAddress object;
    3411             : 
    3412         132 :             object.classId = RelationRelationId;
    3413         132 :             object.objectId = oldIndexId;
    3414         132 :             object.objectSubId = 0;
    3415             : 
    3416         132 :             add_exact_object_address(&object, objects);
    3417             :         }
    3418             : 
    3419             :         /*
    3420             :          * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
    3421             :          * right lock level.
    3422             :          */
    3423          88 :         performMultipleDeletions(objects, DROP_RESTRICT,
    3424             :                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
    3425             :     }
    3426             : 
    3427          88 :     PopActiveSnapshot();
    3428          88 :     CommitTransactionCommand();
    3429             : 
    3430             :     /*
    3431             :      * Finally, release the session-level lock on the table.
    3432             :      */
    3433         476 :     foreach(lc, relationLocks)
    3434             :     {
    3435         388 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    3436             : 
    3437         388 :         UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    3438             :     }
    3439             : 
    3440             :     /* Start a new transaction to finish process properly */
    3441          88 :     StartTransactionCommand();
    3442             : 
    3443             :     /* Log what we did */
    3444          88 :     if (options & REINDEXOPT_VERBOSE)
    3445             :     {
    3446           0 :         if (relkind == RELKIND_INDEX)
    3447           0 :             ereport(INFO,
    3448             :                     (errmsg("index \"%s.%s\" was reindexed",
    3449             :                             relationNamespace, relationName),
    3450             :                      errdetail("%s.",
    3451             :                                pg_rusage_show(&ru0))));
    3452             :         else
    3453             :         {
    3454           0 :             foreach(lc, newIndexIds)
    3455             :             {
    3456           0 :                 Oid         indOid = lfirst_oid(lc);
    3457             : 
    3458           0 :                 ereport(INFO,
    3459             :                         (errmsg("index \"%s.%s\" was reindexed",
    3460             :                                 get_namespace_name(get_rel_namespace(indOid)),
    3461             :                                 get_rel_name(indOid))));
    3462             :                 /* Don't show rusage here, since it's not per index. */
    3463             :             }
    3464             : 
    3465           0 :             ereport(INFO,
    3466             :                     (errmsg("table \"%s.%s\" was reindexed",
    3467             :                             relationNamespace, relationName),
    3468             :                      errdetail("%s.",
    3469             :                                pg_rusage_show(&ru0))));
    3470             :         }
    3471             :     }
    3472             : 
    3473          88 :     MemoryContextDelete(private_context);
    3474             : 
    3475          88 :     pgstat_progress_end_command();
    3476             : 
    3477          88 :     return true;
    3478             : }
    3479             : 
    3480             : /*
    3481             :  *  ReindexPartitionedIndex
    3482             :  *      Reindex each child of the given partitioned index.
    3483             :  *
    3484             :  * Not yet implemented.
    3485             :  */
    3486             : static void
    3487           8 : ReindexPartitionedIndex(Relation parentIdx)
    3488             : {
    3489           8 :     ereport(ERROR,
    3490             :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3491             :              errmsg("REINDEX is not yet implemented for partitioned indexes")));
    3492             : }
    3493             : 
    3494             : /*
    3495             :  * Insert or delete an appropriate pg_inherits tuple to make the given index
    3496             :  * be a partition of the indicated parent index.
    3497             :  *
    3498             :  * This also corrects the pg_depend information for the affected index.
    3499             :  */
    3500             : void
    3501         312 : IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
    3502             : {
    3503             :     Relation    pg_inherits;
    3504             :     ScanKeyData key[2];
    3505             :     SysScanDesc scan;
    3506         312 :     Oid         partRelid = RelationGetRelid(partitionIdx);
    3507             :     HeapTuple   tuple;
    3508             :     bool        fix_dependencies;
    3509             : 
    3510             :     /* Make sure this is an index */
    3511             :     Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
    3512             :            partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
    3513             : 
    3514             :     /*
    3515             :      * Scan pg_inherits for rows linking our index to some parent.
    3516             :      */
    3517         312 :     pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
    3518         312 :     ScanKeyInit(&key[0],
    3519             :                 Anum_pg_inherits_inhrelid,
    3520             :                 BTEqualStrategyNumber, F_OIDEQ,
    3521             :                 ObjectIdGetDatum(partRelid));
    3522         312 :     ScanKeyInit(&key[1],
    3523             :                 Anum_pg_inherits_inhseqno,
    3524             :                 BTEqualStrategyNumber, F_INT4EQ,
    3525             :                 Int32GetDatum(1));
    3526         312 :     scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
    3527             :                               NULL, 2, key);
    3528         312 :     tuple = systable_getnext(scan);
    3529             : 
    3530         312 :     if (!HeapTupleIsValid(tuple))
    3531             :     {
    3532         256 :         if (parentOid == InvalidOid)
    3533             :         {
    3534             :             /*
    3535             :              * No pg_inherits row, and no parent wanted: nothing to do in this
    3536             :              * case.
    3537             :              */
    3538           0 :             fix_dependencies = false;
    3539             :         }
    3540             :         else
    3541             :         {
    3542             :             Datum       values[Natts_pg_inherits];
    3543             :             bool        isnull[Natts_pg_inherits];
    3544             : 
    3545             :             /*
    3546             :              * No pg_inherits row exists, and we want a parent for this index,
    3547             :              * so insert it.
    3548             :              */
    3549         256 :             values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(partRelid);
    3550         256 :             values[Anum_pg_inherits_inhparent - 1] =
    3551         256 :                 ObjectIdGetDatum(parentOid);
    3552         256 :             values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(1);
    3553         256 :             memset(isnull, false, sizeof(isnull));
    3554             : 
    3555         256 :             tuple = heap_form_tuple(RelationGetDescr(pg_inherits),
    3556             :                                     values, isnull);
    3557         256 :             CatalogTupleInsert(pg_inherits, tuple);
    3558             : 
    3559         256 :             fix_dependencies = true;
    3560             :         }
    3561             :     }
    3562             :     else
    3563             :     {
    3564          56 :         Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
    3565             : 
    3566          56 :         if (parentOid == InvalidOid)
    3567             :         {
    3568             :             /*
    3569             :              * There exists a pg_inherits row, which we want to clear; do so.
    3570             :              */
    3571          56 :             CatalogTupleDelete(pg_inherits, &tuple->t_self);
    3572          56 :             fix_dependencies = true;
    3573             :         }
    3574             :         else
    3575             :         {
    3576             :             /*
    3577             :              * A pg_inherits row exists.  If it's the same we want, then we're
    3578             :              * good; if it differs, that amounts to a corrupt catalog and
    3579             :              * should not happen.
    3580             :              */
    3581           0 :             if (inhForm->inhparent != parentOid)
    3582             :             {
    3583             :                 /* unexpected: we should not get called in this case */
    3584           0 :                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
    3585             :                      inhForm->inhrelid, inhForm->inhparent);
    3586             :             }
    3587             : 
    3588             :             /* already in the right state */
    3589           0 :             fix_dependencies = false;
    3590             :         }
    3591             :     }
    3592             : 
    3593             :     /* done with pg_inherits */
    3594         312 :     systable_endscan(scan);
    3595         312 :     relation_close(pg_inherits, RowExclusiveLock);
    3596             : 
    3597             :     /* set relhassubclass if an index partition has been added to the parent */
    3598         312 :     if (OidIsValid(parentOid))
    3599         256 :         SetRelationHasSubclass(parentOid, true);
    3600             : 
    3601             :     /* set relispartition correctly on the partition */
    3602         312 :     update_relispartition(partRelid, OidIsValid(parentOid));
    3603             : 
    3604         312 :     if (fix_dependencies)
    3605             :     {
    3606             :         /*
    3607             :          * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
    3608             :          * dependencies on the parent index and the table; if removing a
    3609             :          * parent, delete PARTITION dependencies.
    3610             :          */
    3611         312 :         if (OidIsValid(parentOid))
    3612             :         {
    3613             :             ObjectAddress partIdx;
    3614             :             ObjectAddress parentIdx;
    3615             :             ObjectAddress partitionTbl;
    3616             : 
    3617         256 :             ObjectAddressSet(partIdx, RelationRelationId, partRelid);
    3618         256 :             ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
    3619         256 :             ObjectAddressSet(partitionTbl, RelationRelationId,
    3620             :                              partitionIdx->rd_index->indrelid);
    3621         256 :             recordDependencyOn(&partIdx, &parentIdx,
    3622             :                                DEPENDENCY_PARTITION_PRI);
    3623         256 :             recordDependencyOn(&partIdx, &partitionTbl,
    3624             :                                DEPENDENCY_PARTITION_SEC);
    3625             :         }
    3626             :         else
    3627             :         {
    3628          56 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    3629             :                                             RelationRelationId,
    3630             :                                             DEPENDENCY_PARTITION_PRI);
    3631          56 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    3632             :                                             RelationRelationId,
    3633             :                                             DEPENDENCY_PARTITION_SEC);
    3634             :         }
    3635             : 
    3636             :         /* make our updates visible */
    3637         312 :         CommandCounterIncrement();
    3638             :     }
    3639         312 : }
    3640             : 
    3641             : /*
    3642             :  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
    3643             :  * given index to the given value.
    3644             :  */
    3645             : static void
    3646         312 : update_relispartition(Oid relationId, bool newval)
    3647             : {
    3648             :     HeapTuple   tup;
    3649             :     Relation    classRel;
    3650             : 
    3651         312 :     classRel = table_open(RelationRelationId, RowExclusiveLock);
    3652         312 :     tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
    3653         312 :     if (!HeapTupleIsValid(tup))
    3654           0 :         elog(ERROR, "cache lookup failed for relation %u", relationId);
    3655             :     Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    3656         312 :     ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
    3657         312 :     CatalogTupleUpdate(classRel, &tup->t_self, tup);
    3658         312 :     heap_freetuple(tup);
    3659         312 :     table_close(classRel, RowExclusiveLock);
    3660         312 : }

Generated by: LCOV version 1.13