LCOV - code coverage report
Current view: top level - src/backend/commands - indexcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 1164 1278 91.1 %
Date: 2023-05-30 18:12:27 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * indexcmds.c
       4             :  *    POSTGRES define and remove index code.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/commands/indexcmds.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/amapi.h"
      19             : #include "access/heapam.h"
      20             : #include "access/htup_details.h"
      21             : #include "access/reloptions.h"
      22             : #include "access/sysattr.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/catalog.h"
      26             : #include "catalog/index.h"
      27             : #include "catalog/indexing.h"
      28             : #include "catalog/pg_am.h"
      29             : #include "catalog/pg_authid.h"
      30             : #include "catalog/pg_constraint.h"
      31             : #include "catalog/pg_database.h"
      32             : #include "catalog/pg_inherits.h"
      33             : #include "catalog/pg_namespace.h"
      34             : #include "catalog/pg_opclass.h"
      35             : #include "catalog/pg_opfamily.h"
      36             : #include "catalog/pg_tablespace.h"
      37             : #include "catalog/pg_type.h"
      38             : #include "commands/comment.h"
      39             : #include "commands/dbcommands.h"
      40             : #include "commands/defrem.h"
      41             : #include "commands/event_trigger.h"
      42             : #include "commands/progress.h"
      43             : #include "commands/tablecmds.h"
      44             : #include "commands/tablespace.h"
      45             : #include "mb/pg_wchar.h"
      46             : #include "miscadmin.h"
      47             : #include "nodes/makefuncs.h"
      48             : #include "nodes/nodeFuncs.h"
      49             : #include "optimizer/optimizer.h"
      50             : #include "parser/parse_coerce.h"
      51             : #include "parser/parse_func.h"
      52             : #include "parser/parse_oper.h"
      53             : #include "partitioning/partdesc.h"
      54             : #include "pgstat.h"
      55             : #include "rewrite/rewriteManip.h"
      56             : #include "storage/lmgr.h"
      57             : #include "storage/proc.h"
      58             : #include "storage/procarray.h"
      59             : #include "storage/sinvaladt.h"
      60             : #include "utils/acl.h"
      61             : #include "utils/builtins.h"
      62             : #include "utils/fmgroids.h"
      63             : #include "utils/guc.h"
      64             : #include "utils/inval.h"
      65             : #include "utils/lsyscache.h"
      66             : #include "utils/memutils.h"
      67             : #include "utils/partcache.h"
      68             : #include "utils/pg_rusage.h"
      69             : #include "utils/regproc.h"
      70             : #include "utils/snapmgr.h"
      71             : #include "utils/syscache.h"
      72             : 
      73             : 
      74             : /* non-export function prototypes */
      75             : static bool CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts);
      76             : static void CheckPredicate(Expr *predicate);
      77             : static void ComputeIndexAttrs(IndexInfo *indexInfo,
      78             :                               Oid *typeOidP,
      79             :                               Oid *collationOidP,
      80             :                               Oid *classOidP,
      81             :                               int16 *colOptionP,
      82             :                               List *attList,
      83             :                               List *exclusionOpNames,
      84             :                               Oid relId,
      85             :                               const char *accessMethodName, Oid accessMethodId,
      86             :                               bool amcanorder,
      87             :                               bool isconstraint,
      88             :                               Oid ddl_userid,
      89             :                               int ddl_sec_context,
      90             :                               int *ddl_save_nestlevel);
      91             : static char *ChooseIndexName(const char *tabname, Oid namespaceId,
      92             :                              List *colnames, List *exclusionOpNames,
      93             :                              bool primary, bool isconstraint);
      94             : static char *ChooseIndexNameAddition(List *colnames);
      95             : static List *ChooseIndexColumnNames(List *indexElems);
      96             : static void ReindexIndex(RangeVar *indexRelation, ReindexParams *params,
      97             :                          bool isTopLevel);
      98             : static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
      99             :                                             Oid relId, Oid oldRelId, void *arg);
     100             : static Oid  ReindexTable(RangeVar *relation, ReindexParams *params,
     101             :                          bool isTopLevel);
     102             : static void ReindexMultipleTables(const char *objectName,
     103             :                                   ReindexObjectType objectKind, ReindexParams *params);
     104             : static void reindex_error_callback(void *arg);
     105             : static void ReindexPartitions(Oid relid, ReindexParams *params,
     106             :                               bool isTopLevel);
     107             : static void ReindexMultipleInternal(List *relids,
     108             :                                     ReindexParams *params);
     109             : static bool ReindexRelationConcurrently(Oid relationOid,
     110             :                                         ReindexParams *params);
     111             : static void update_relispartition(Oid relationId, bool newval);
     112             : static inline void set_indexsafe_procflags(void);
     113             : 
     114             : /*
     115             :  * callback argument type for RangeVarCallbackForReindexIndex()
     116             :  */
     117             : struct ReindexIndexCallbackState
     118             : {
     119             :     ReindexParams params;       /* options from statement */
     120             :     Oid         locked_table_oid;   /* tracks previously locked table */
     121             : };
     122             : 
     123             : /*
     124             :  * callback arguments for reindex_error_callback()
     125             :  */
     126             : typedef struct ReindexErrorInfo
     127             : {
     128             :     char       *relname;
     129             :     char       *relnamespace;
     130             :     char        relkind;
     131             : } ReindexErrorInfo;
     132             : 
     133             : /*
     134             :  * CheckIndexCompatible
     135             :  *      Determine whether an existing index definition is compatible with a
     136             :  *      prospective index definition, such that the existing index storage
     137             :  *      could become the storage of the new index, avoiding a rebuild.
     138             :  *
     139             :  * 'oldId': the OID of the existing index
     140             :  * 'accessMethodName': name of the AM to use.
     141             :  * 'attributeList': a list of IndexElem specifying columns and expressions
     142             :  *      to index on.
     143             :  * 'exclusionOpNames': list of names of exclusion-constraint operators,
     144             :  *      or NIL if not an exclusion constraint.
     145             :  *
     146             :  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
     147             :  * any indexes that depended on a changing column from their pg_get_indexdef
     148             :  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
     149             :  * DefineIndex.  We assume that the old and new indexes have the same number
     150             :  * of columns and that if one has an expression column or predicate, both do.
     151             :  * Errors arising from the attribute list still apply.
     152             :  *
     153             :  * Most column type changes that can skip a table rewrite do not invalidate
     154             :  * indexes.  We acknowledge this when all operator classes, collations and
     155             :  * exclusion operators match.  Though we could further permit intra-opfamily
     156             :  * changes for btree and hash indexes, that adds subtle complexity with no
     157             :  * concrete benefit for core types. Note, that INCLUDE columns aren't
     158             :  * checked by this function, for them it's enough that table rewrite is
     159             :  * skipped.
     160             :  *
     161             :  * When a comparison or exclusion operator has a polymorphic input type, the
     162             :  * actual input types must also match.  This defends against the possibility
     163             :  * that operators could vary behavior in response to get_fn_expr_argtype().
     164             :  * At present, this hazard is theoretical: check_exclusion_constraint() and
     165             :  * all core index access methods decline to set fn_expr for such calls.
     166             :  *
     167             :  * We do not yet implement a test to verify compatibility of expression
     168             :  * columns or predicates, so assume any such index is incompatible.
     169             :  */
     170             : bool
     171         102 : CheckIndexCompatible(Oid oldId,
     172             :                      const char *accessMethodName,
     173             :                      List *attributeList,
     174             :                      List *exclusionOpNames)
     175             : {
     176             :     bool        isconstraint;
     177             :     Oid        *typeObjectId;
     178             :     Oid        *collationObjectId;
     179             :     Oid        *classObjectId;
     180             :     Oid         accessMethodId;
     181             :     Oid         relationId;
     182             :     HeapTuple   tuple;
     183             :     Form_pg_index indexForm;
     184             :     Form_pg_am  accessMethodForm;
     185             :     IndexAmRoutine *amRoutine;
     186             :     bool        amcanorder;
     187             :     bool        amsummarizing;
     188             :     int16      *coloptions;
     189             :     IndexInfo  *indexInfo;
     190             :     int         numberOfAttributes;
     191             :     int         old_natts;
     192         102 :     bool        ret = true;
     193             :     oidvector  *old_indclass;
     194             :     oidvector  *old_indcollation;
     195             :     Relation    irel;
     196             :     int         i;
     197             :     Datum       d;
     198             : 
     199             :     /* Caller should already have the relation locked in some way. */
     200         102 :     relationId = IndexGetRelation(oldId, false);
     201             : 
     202             :     /*
     203             :      * We can pretend isconstraint = false unconditionally.  It only serves to
     204             :      * decide the text of an error message that should never happen for us.
     205             :      */
     206         102 :     isconstraint = false;
     207             : 
     208         102 :     numberOfAttributes = list_length(attributeList);
     209             :     Assert(numberOfAttributes > 0);
     210             :     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
     211             : 
     212             :     /* look up the access method */
     213         102 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     214         102 :     if (!HeapTupleIsValid(tuple))
     215           0 :         ereport(ERROR,
     216             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     217             :                  errmsg("access method \"%s\" does not exist",
     218             :                         accessMethodName)));
     219         102 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     220         102 :     accessMethodId = accessMethodForm->oid;
     221         102 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     222         102 :     ReleaseSysCache(tuple);
     223             : 
     224         102 :     amcanorder = amRoutine->amcanorder;
     225         102 :     amsummarizing = amRoutine->amsummarizing;
     226             : 
     227             :     /*
     228             :      * Compute the operator classes, collations, and exclusion operators for
     229             :      * the new index, so we can test whether it's compatible with the existing
     230             :      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
     231             :      * DefineIndex would have failed later.  Our attributeList contains only
     232             :      * key attributes, thus we're filling ii_NumIndexAttrs and
     233             :      * ii_NumIndexKeyAttrs with same value.
     234             :      */
     235         102 :     indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
     236             :                               accessMethodId, NIL, NIL, false, false,
     237             :                               false, false, amsummarizing);
     238         102 :     typeObjectId = palloc_array(Oid, numberOfAttributes);
     239         102 :     collationObjectId = palloc_array(Oid, numberOfAttributes);
     240         102 :     classObjectId = palloc_array(Oid, numberOfAttributes);
     241         102 :     coloptions = palloc_array(int16, numberOfAttributes);
     242         102 :     ComputeIndexAttrs(indexInfo,
     243             :                       typeObjectId, collationObjectId, classObjectId,
     244             :                       coloptions, attributeList,
     245             :                       exclusionOpNames, relationId,
     246             :                       accessMethodName, accessMethodId,
     247             :                       amcanorder, isconstraint, InvalidOid, 0, NULL);
     248             : 
     249             : 
     250             :     /* Get the soon-obsolete pg_index tuple. */
     251         102 :     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
     252         102 :     if (!HeapTupleIsValid(tuple))
     253           0 :         elog(ERROR, "cache lookup failed for index %u", oldId);
     254         102 :     indexForm = (Form_pg_index) GETSTRUCT(tuple);
     255             : 
     256             :     /*
     257             :      * We don't assess expressions or predicates; assume incompatibility.
     258             :      * Also, if the index is invalid for any reason, treat it as incompatible.
     259             :      */
     260         204 :     if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
     261         102 :           heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
     262         102 :           indexForm->indisvalid))
     263             :     {
     264           0 :         ReleaseSysCache(tuple);
     265           0 :         return false;
     266             :     }
     267             : 
     268             :     /* Any change in operator class or collation breaks compatibility. */
     269         102 :     old_natts = indexForm->indnkeyatts;
     270             :     Assert(old_natts == numberOfAttributes);
     271             : 
     272         102 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indcollation);
     273         102 :     old_indcollation = (oidvector *) DatumGetPointer(d);
     274             : 
     275         102 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indclass);
     276         102 :     old_indclass = (oidvector *) DatumGetPointer(d);
     277             : 
     278         204 :     ret = (memcmp(old_indclass->values, classObjectId,
     279         204 :                   old_natts * sizeof(Oid)) == 0 &&
     280         102 :            memcmp(old_indcollation->values, collationObjectId,
     281             :                   old_natts * sizeof(Oid)) == 0);
     282             : 
     283         102 :     ReleaseSysCache(tuple);
     284             : 
     285         102 :     if (!ret)
     286           0 :         return false;
     287             : 
     288             :     /* For polymorphic opcintype, column type changes break compatibility. */
     289         102 :     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
     290         210 :     for (i = 0; i < old_natts; i++)
     291             :     {
     292         108 :         if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
     293           0 :             TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
     294             :         {
     295           0 :             ret = false;
     296           0 :             break;
     297             :         }
     298             :     }
     299             : 
     300             :     /* Any change in opclass options break compatibility. */
     301         102 :     if (ret)
     302             :     {
     303         102 :         Datum      *opclassOptions = RelationGetIndexRawAttOptions(irel);
     304             : 
     305         102 :         ret = CompareOpclassOptions(opclassOptions,
     306             :                                     indexInfo->ii_OpclassOptions, old_natts);
     307             : 
     308         102 :         if (opclassOptions)
     309           0 :             pfree(opclassOptions);
     310             :     }
     311             : 
     312             :     /* Any change in exclusion operator selections breaks compatibility. */
     313         102 :     if (ret && indexInfo->ii_ExclusionOps != NULL)
     314             :     {
     315             :         Oid        *old_operators,
     316             :                    *old_procs;
     317             :         uint16     *old_strats;
     318             : 
     319           0 :         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
     320           0 :         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
     321             :                      old_natts * sizeof(Oid)) == 0;
     322             : 
     323             :         /* Require an exact input type match for polymorphic operators. */
     324           0 :         if (ret)
     325             :         {
     326           0 :             for (i = 0; i < old_natts && ret; i++)
     327             :             {
     328             :                 Oid         left,
     329             :                             right;
     330             : 
     331           0 :                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
     332           0 :                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
     333           0 :                     TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
     334             :                 {
     335           0 :                     ret = false;
     336           0 :                     break;
     337             :                 }
     338             :             }
     339             :         }
     340             :     }
     341             : 
     342         102 :     index_close(irel, NoLock);
     343         102 :     return ret;
     344             : }
     345             : 
     346             : /*
     347             :  * CompareOpclassOptions
     348             :  *
     349             :  * Compare per-column opclass options which are represented by arrays of text[]
     350             :  * datums.  Both elements of arrays and array themselves can be NULL.
     351             :  */
     352             : static bool
     353         102 : CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts)
     354             : {
     355             :     int         i;
     356             : 
     357         102 :     if (!opts1 && !opts2)
     358         102 :         return true;
     359             : 
     360           0 :     for (i = 0; i < natts; i++)
     361             :     {
     362           0 :         Datum       opt1 = opts1 ? opts1[i] : (Datum) 0;
     363           0 :         Datum       opt2 = opts2 ? opts2[i] : (Datum) 0;
     364             : 
     365           0 :         if (opt1 == (Datum) 0)
     366             :         {
     367           0 :             if (opt2 == (Datum) 0)
     368           0 :                 continue;
     369             :             else
     370           0 :                 return false;
     371             :         }
     372           0 :         else if (opt2 == (Datum) 0)
     373           0 :             return false;
     374             : 
     375             :         /* Compare non-NULL text[] datums. */
     376           0 :         if (!DatumGetBool(DirectFunctionCall2(array_eq, opt1, opt2)))
     377           0 :             return false;
     378             :     }
     379             : 
     380           0 :     return true;
     381             : }
     382             : 
     383             : /*
     384             :  * WaitForOlderSnapshots
     385             :  *
     386             :  * Wait for transactions that might have an older snapshot than the given xmin
     387             :  * limit, because it might not contain tuples deleted just before it has
     388             :  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
     389             :  * individually. This is used when building an index concurrently.
     390             :  *
     391             :  * We can exclude any running transactions that have xmin > the xmin given;
     392             :  * their oldest snapshot must be newer than our xmin limit.
     393             :  * We can also exclude any transactions that have xmin = zero, since they
     394             :  * evidently have no live snapshot at all (and any one they might be in
     395             :  * process of taking is certainly newer than ours).  Transactions in other
     396             :  * DBs can be ignored too, since they'll never even be able to see the
     397             :  * index being worked on.
     398             :  *
     399             :  * We can also exclude autovacuum processes and processes running manual
     400             :  * lazy VACUUMs, because they won't be fazed by missing index entries
     401             :  * either.  (Manual ANALYZEs, however, can't be excluded because they
     402             :  * might be within transactions that are going to do arbitrary operations
     403             :  * later.)  Processes running CREATE INDEX CONCURRENTLY or REINDEX CONCURRENTLY
     404             :  * on indexes that are neither expressional nor partial are also safe to
     405             :  * ignore, since we know that those processes won't examine any data
     406             :  * outside the table they're indexing.
     407             :  *
     408             :  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
     409             :  * check for that.
     410             :  *
     411             :  * If a process goes idle-in-transaction with xmin zero, we do not need to
     412             :  * wait for it anymore, per the above argument.  We do not have the
     413             :  * infrastructure right now to stop waiting if that happens, but we can at
     414             :  * least avoid the folly of waiting when it is idle at the time we would
     415             :  * begin to wait.  We do this by repeatedly rechecking the output of
     416             :  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
     417             :  * doesn't show up in the output, we know we can forget about it.
     418             :  */
     419             : void
     420         568 : WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
     421             : {
     422             :     int         n_old_snapshots;
     423             :     int         i;
     424             :     VirtualTransactionId *old_snapshots;
     425             : 
     426         568 :     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
     427             :                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     428             :                                           | PROC_IN_SAFE_IC,
     429             :                                           &n_old_snapshots);
     430         568 :     if (progress)
     431         554 :         pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
     432             : 
     433         840 :     for (i = 0; i < n_old_snapshots; i++)
     434             :     {
     435         272 :         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
     436          48 :             continue;           /* found uninteresting in previous cycle */
     437             : 
     438         224 :         if (i > 0)
     439             :         {
     440             :             /* see if anything's changed ... */
     441             :             VirtualTransactionId *newer_snapshots;
     442             :             int         n_newer_snapshots;
     443             :             int         j;
     444             :             int         k;
     445             : 
     446          76 :             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
     447             :                                                     true, false,
     448             :                                                     PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     449             :                                                     | PROC_IN_SAFE_IC,
     450             :                                                     &n_newer_snapshots);
     451         252 :             for (j = i; j < n_old_snapshots; j++)
     452             :             {
     453         176 :                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
     454           4 :                     continue;   /* found uninteresting in previous cycle */
     455         458 :                 for (k = 0; k < n_newer_snapshots; k++)
     456             :                 {
     457         370 :                     if (VirtualTransactionIdEquals(old_snapshots[j],
     458             :                                                    newer_snapshots[k]))
     459          84 :                         break;
     460             :                 }
     461         172 :                 if (k >= n_newer_snapshots) /* not there anymore */
     462          88 :                     SetInvalidVirtualTransactionId(old_snapshots[j]);
     463             :             }
     464          76 :             pfree(newer_snapshots);
     465             :         }
     466             : 
     467         224 :         if (VirtualTransactionIdIsValid(old_snapshots[i]))
     468             :         {
     469             :             /* If requested, publish who we're going to wait for. */
     470         184 :             if (progress)
     471             :             {
     472         184 :                 PGPROC     *holder = BackendIdGetProc(old_snapshots[i].backendId);
     473             : 
     474         184 :                 if (holder)
     475         184 :                     pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
     476         184 :                                                  holder->pid);
     477             :             }
     478         184 :             VirtualXactLock(old_snapshots[i], true);
     479             :         }
     480             : 
     481         224 :         if (progress)
     482         224 :             pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
     483             :     }
     484         568 : }
     485             : 
     486             : 
     487             : /*
     488             :  * DefineIndex
     489             :  *      Creates a new index.
     490             :  *
     491             :  * This function manages the current userid according to the needs of pg_dump.
     492             :  * Recreating old-database catalog entries in new-database is fine, regardless
     493             :  * of which users would have permission to recreate those entries now.  That's
     494             :  * just preservation of state.  Running opaque expressions, like calling a
     495             :  * function named in a catalog entry or evaluating a pg_node_tree in a catalog
     496             :  * entry, as anyone other than the object owner, is not fine.  To adhere to
     497             :  * those principles and to remain fail-safe, use the table owner userid for
     498             :  * most ACL checks.  Use the original userid for ACL checks reached without
     499             :  * traversing opaque expressions.  (pg_dump can predict such ACL checks from
     500             :  * catalogs.)  Overall, this is a mess.  Future DDL development should
     501             :  * consider offering one DDL command for catalog setup and a separate DDL
     502             :  * command for steps that run opaque expressions.
     503             :  *
     504             :  * 'relationId': the OID of the heap relation on which the index is to be
     505             :  *      created
     506             :  * 'stmt': IndexStmt describing the properties of the new index.
     507             :  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
     508             :  *      nonzero to specify a preselected OID for the index.
     509             :  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
     510             :  *      of a partitioned index.
     511             :  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
     512             :  *      the child of a constraint (only used when recursing)
     513             :  * 'total_parts': total number of direct and indirect partitions of relation;
     514             :  *      pass -1 if not known or rel is not partitioned.
     515             :  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
     516             :  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
     517             :  *      should be true except when ALTER is deleting/recreating an index.)
     518             :  * 'check_not_in_use': check for table not already in use in current session.
     519             :  *      This should be true unless caller is holding the table open, in which
     520             :  *      case the caller had better have checked it earlier.
     521             :  * 'skip_build': make the catalog entries but don't create the index files
     522             :  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
     523             :  *
     524             :  * Returns the object address of the created index.
     525             :  */
     526             : ObjectAddress
     527       90250 : DefineIndex(Oid relationId,
     528             :             IndexStmt *stmt,
     529             :             Oid indexRelationId,
     530             :             Oid parentIndexId,
     531             :             Oid parentConstraintId,
     532             :             int total_parts,
     533             :             bool is_alter_table,
     534             :             bool check_rights,
     535             :             bool check_not_in_use,
     536             :             bool skip_build,
     537             :             bool quiet)
     538             : {
     539             :     bool        concurrent;
     540             :     char       *indexRelationName;
     541             :     char       *accessMethodName;
     542             :     Oid        *typeObjectId;
     543             :     Oid        *collationObjectId;
     544             :     Oid        *classObjectId;
     545             :     Oid         accessMethodId;
     546             :     Oid         namespaceId;
     547             :     Oid         tablespaceId;
     548       90250 :     Oid         createdConstraintId = InvalidOid;
     549             :     List       *indexColNames;
     550             :     List       *allIndexParams;
     551             :     Relation    rel;
     552             :     HeapTuple   tuple;
     553             :     Form_pg_am  accessMethodForm;
     554             :     IndexAmRoutine *amRoutine;
     555             :     bool        amcanorder;
     556             :     bool        amissummarizing;
     557             :     amoptions_function amoptions;
     558             :     bool        partitioned;
     559             :     bool        safe_index;
     560             :     Datum       reloptions;
     561             :     int16      *coloptions;
     562             :     IndexInfo  *indexInfo;
     563             :     bits16      flags;
     564             :     bits16      constr_flags;
     565             :     int         numberOfAttributes;
     566             :     int         numberOfKeyAttributes;
     567             :     TransactionId limitXmin;
     568             :     ObjectAddress address;
     569             :     LockRelId   heaprelid;
     570             :     LOCKTAG     heaplocktag;
     571             :     LOCKMODE    lockmode;
     572             :     Snapshot    snapshot;
     573             :     Oid         root_save_userid;
     574             :     int         root_save_sec_context;
     575             :     int         root_save_nestlevel;
     576             : 
     577       90250 :     root_save_nestlevel = NewGUCNestLevel();
     578             : 
     579             :     /*
     580             :      * Some callers need us to run with an empty default_tablespace; this is a
     581             :      * necessary hack to be able to reproduce catalog state accurately when
     582             :      * recreating indexes after table-rewriting ALTER TABLE.
     583             :      */
     584       90250 :     if (stmt->reset_default_tblspc)
     585         476 :         (void) set_config_option("default_tablespace", "",
     586             :                                  PGC_USERSET, PGC_S_SESSION,
     587             :                                  GUC_ACTION_SAVE, true, 0, false);
     588             : 
     589             :     /*
     590             :      * Force non-concurrent build on temporary relations, even if CONCURRENTLY
     591             :      * was requested.  Other backends can't access a temporary relation, so
     592             :      * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
     593             :      * is more efficient.  Do this before any use of the concurrent option is
     594             :      * done.
     595             :      */
     596       90250 :     if (stmt->concurrent && get_rel_persistence(relationId) != RELPERSISTENCE_TEMP)
     597         152 :         concurrent = true;
     598             :     else
     599       90098 :         concurrent = false;
     600             : 
     601             :     /*
     602             :      * Start progress report.  If we're building a partition, this was already
     603             :      * done.
     604             :      */
     605       90250 :     if (!OidIsValid(parentIndexId))
     606             :     {
     607       88116 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
     608             :                                       relationId);
     609       88116 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
     610             :                                      concurrent ?
     611             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
     612             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE);
     613             :     }
     614             : 
     615             :     /*
     616             :      * No index OID to report yet
     617             :      */
     618       90250 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
     619             :                                  InvalidOid);
     620             : 
     621             :     /*
     622             :      * count key attributes in index
     623             :      */
     624       90250 :     numberOfKeyAttributes = list_length(stmt->indexParams);
     625             : 
     626             :     /*
     627             :      * Calculate the new list of index columns including both key columns and
     628             :      * INCLUDE columns.  Later we can determine which of these are key
     629             :      * columns, and which are just part of the INCLUDE list by checking the
     630             :      * list position.  A list item in a position less than ii_NumIndexKeyAttrs
     631             :      * is part of the key columns, and anything equal to and over is part of
     632             :      * the INCLUDE columns.
     633             :      */
     634       90250 :     allIndexParams = list_concat_copy(stmt->indexParams,
     635       90250 :                                       stmt->indexIncludingParams);
     636       90250 :     numberOfAttributes = list_length(allIndexParams);
     637             : 
     638       90250 :     if (numberOfKeyAttributes <= 0)
     639           0 :         ereport(ERROR,
     640             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     641             :                  errmsg("must specify at least one column")));
     642       90250 :     if (numberOfAttributes > INDEX_MAX_KEYS)
     643           0 :         ereport(ERROR,
     644             :                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
     645             :                  errmsg("cannot use more than %d columns in an index",
     646             :                         INDEX_MAX_KEYS)));
     647             : 
     648             :     /*
     649             :      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     650             :      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     651             :      * (but not VACUUM).
     652             :      *
     653             :      * NB: Caller is responsible for making sure that relationId refers to the
     654             :      * relation on which the index should be built; except in bootstrap mode,
     655             :      * this will typically require the caller to have already locked the
     656             :      * relation.  To avoid lock upgrade hazards, that lock should be at least
     657             :      * as strong as the one we take here.
     658             :      *
     659             :      * NB: If the lock strength here ever changes, code that is run by
     660             :      * parallel workers under the control of certain particular ambuild
     661             :      * functions will need to be updated, too.
     662             :      */
     663       90250 :     lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
     664       90250 :     rel = table_open(relationId, lockmode);
     665             : 
     666             :     /*
     667             :      * Switch to the table owner's userid, so that any index functions are run
     668             :      * as that user.  Also lock down security-restricted operations.  We
     669             :      * already arranged to make GUC variable changes local to this command.
     670             :      */
     671       90250 :     GetUserIdAndSecContext(&root_save_userid, &root_save_sec_context);
     672       90250 :     SetUserIdAndSecContext(rel->rd_rel->relowner,
     673             :                            root_save_sec_context | SECURITY_RESTRICTED_OPERATION);
     674             : 
     675       90250 :     namespaceId = RelationGetNamespace(rel);
     676             : 
     677             :     /* Ensure that it makes sense to index this kind of relation */
     678       90250 :     switch (rel->rd_rel->relkind)
     679             :     {
     680       90244 :         case RELKIND_RELATION:
     681             :         case RELKIND_MATVIEW:
     682             :         case RELKIND_PARTITIONED_TABLE:
     683             :             /* OK */
     684       90244 :             break;
     685           6 :         default:
     686           6 :             ereport(ERROR,
     687             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     688             :                      errmsg("cannot create index on relation \"%s\"",
     689             :                             RelationGetRelationName(rel)),
     690             :                      errdetail_relkind_not_supported(rel->rd_rel->relkind)));
     691             :             break;
     692             :     }
     693             : 
     694             :     /*
     695             :      * Establish behavior for partitioned tables, and verify sanity of
     696             :      * parameters.
     697             :      *
     698             :      * We do not build an actual index in this case; we only create a few
     699             :      * catalog entries.  The actual indexes are built by recursing for each
     700             :      * partition.
     701             :      */
     702       90244 :     partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
     703       90244 :     if (partitioned)
     704             :     {
     705             :         /*
     706             :          * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
     707             :          * the error is thrown also for temporary tables.  Seems better to be
     708             :          * consistent, even though we could do it on temporary table because
     709             :          * we're not actually doing it concurrently.
     710             :          */
     711        1714 :         if (stmt->concurrent)
     712           6 :             ereport(ERROR,
     713             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     714             :                      errmsg("cannot create index on partitioned table \"%s\" concurrently",
     715             :                             RelationGetRelationName(rel))));
     716        1708 :         if (stmt->excludeOpNames)
     717           0 :             ereport(ERROR,
     718             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     719             :                      errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
     720             :                             RelationGetRelationName(rel))));
     721             :     }
     722             : 
     723             :     /*
     724             :      * Don't try to CREATE INDEX on temp tables of other backends.
     725             :      */
     726       90238 :     if (RELATION_IS_OTHER_TEMP(rel))
     727           0 :         ereport(ERROR,
     728             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     729             :                  errmsg("cannot create indexes on temporary tables of other sessions")));
     730             : 
     731             :     /*
     732             :      * Unless our caller vouches for having checked this already, insist that
     733             :      * the table not be in use by our own session, either.  Otherwise we might
     734             :      * fail to make entries in the new index (for instance, if an INSERT or
     735             :      * UPDATE is in progress and has already made its list of target indexes).
     736             :      */
     737       90238 :     if (check_not_in_use)
     738       12254 :         CheckTableNotInUse(rel, "CREATE INDEX");
     739             : 
     740             :     /*
     741             :      * Verify we (still) have CREATE rights in the rel's namespace.
     742             :      * (Presumably we did when the rel was created, but maybe not anymore.)
     743             :      * Skip check if caller doesn't want it.  Also skip check if
     744             :      * bootstrapping, since permissions machinery may not be working yet.
     745             :      */
     746       90232 :     if (check_rights && !IsBootstrapProcessingMode())
     747             :     {
     748             :         AclResult   aclresult;
     749             : 
     750       13152 :         aclresult = object_aclcheck(NamespaceRelationId, namespaceId, root_save_userid,
     751             :                                     ACL_CREATE);
     752       13152 :         if (aclresult != ACLCHECK_OK)
     753           0 :             aclcheck_error(aclresult, OBJECT_SCHEMA,
     754           0 :                            get_namespace_name(namespaceId));
     755             :     }
     756             : 
     757             :     /*
     758             :      * Select tablespace to use.  If not specified, use default tablespace
     759             :      * (which may in turn default to database's default).
     760             :      */
     761       90232 :     if (stmt->tableSpace)
     762             :     {
     763         200 :         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
     764         200 :         if (partitioned && tablespaceId == MyDatabaseTableSpace)
     765           6 :             ereport(ERROR,
     766             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     767             :                      errmsg("cannot specify default tablespace for partitioned relations")));
     768             :     }
     769             :     else
     770             :     {
     771       90032 :         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
     772             :                                             partitioned);
     773             :         /* note InvalidOid is OK in this case */
     774             :     }
     775             : 
     776             :     /* Check tablespace permissions */
     777       90220 :     if (check_rights &&
     778         104 :         OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
     779             :     {
     780             :         AclResult   aclresult;
     781             : 
     782         104 :         aclresult = object_aclcheck(TableSpaceRelationId, tablespaceId, root_save_userid,
     783             :                                     ACL_CREATE);
     784         104 :         if (aclresult != ACLCHECK_OK)
     785           0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     786           0 :                            get_tablespace_name(tablespaceId));
     787             :     }
     788             : 
     789             :     /*
     790             :      * Force shared indexes into the pg_global tablespace.  This is a bit of a
     791             :      * hack but seems simpler than marking them in the BKI commands.  On the
     792             :      * other hand, if it's not shared, don't allow it to be placed there.
     793             :      */
     794       90220 :     if (rel->rd_rel->relisshared)
     795       12726 :         tablespaceId = GLOBALTABLESPACE_OID;
     796       77494 :     else if (tablespaceId == GLOBALTABLESPACE_OID)
     797           0 :         ereport(ERROR,
     798             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     799             :                  errmsg("only shared relations can be placed in pg_global tablespace")));
     800             : 
     801             :     /*
     802             :      * Choose the index column names.
     803             :      */
     804       90220 :     indexColNames = ChooseIndexColumnNames(allIndexParams);
     805             : 
     806             :     /*
     807             :      * Select name for index if caller didn't specify
     808             :      */
     809       90220 :     indexRelationName = stmt->idxname;
     810       90220 :     if (indexRelationName == NULL)
     811        9722 :         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
     812             :                                             namespaceId,
     813             :                                             indexColNames,
     814             :                                             stmt->excludeOpNames,
     815        9722 :                                             stmt->primary,
     816        9722 :                                             stmt->isconstraint);
     817             : 
     818             :     /*
     819             :      * look up the access method, verify it can handle the requested features
     820             :      */
     821       90220 :     accessMethodName = stmt->accessMethod;
     822       90220 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     823       90220 :     if (!HeapTupleIsValid(tuple))
     824             :     {
     825             :         /*
     826             :          * Hack to provide more-or-less-transparent updating of old RTREE
     827             :          * indexes to GiST: if RTREE is requested and not found, use GIST.
     828             :          */
     829           6 :         if (strcmp(accessMethodName, "rtree") == 0)
     830             :         {
     831           6 :             ereport(NOTICE,
     832             :                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
     833           6 :             accessMethodName = "gist";
     834           6 :             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     835             :         }
     836             : 
     837           6 :         if (!HeapTupleIsValid(tuple))
     838           0 :             ereport(ERROR,
     839             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     840             :                      errmsg("access method \"%s\" does not exist",
     841             :                             accessMethodName)));
     842             :     }
     843       90220 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     844       90220 :     accessMethodId = accessMethodForm->oid;
     845       90220 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     846             : 
     847       90220 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
     848             :                                  accessMethodId);
     849             : 
     850       90220 :     if (stmt->unique && !amRoutine->amcanunique)
     851           0 :         ereport(ERROR,
     852             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     853             :                  errmsg("access method \"%s\" does not support unique indexes",
     854             :                         accessMethodName)));
     855       90220 :     if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
     856          18 :         ereport(ERROR,
     857             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     858             :                  errmsg("access method \"%s\" does not support included columns",
     859             :                         accessMethodName)));
     860       90202 :     if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
     861           0 :         ereport(ERROR,
     862             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     863             :                  errmsg("access method \"%s\" does not support multicolumn indexes",
     864             :                         accessMethodName)));
     865       90202 :     if (stmt->excludeOpNames && amRoutine->amgettuple == NULL)
     866           0 :         ereport(ERROR,
     867             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     868             :                  errmsg("access method \"%s\" does not support exclusion constraints",
     869             :                         accessMethodName)));
     870             : 
     871       90202 :     amcanorder = amRoutine->amcanorder;
     872       90202 :     amoptions = amRoutine->amoptions;
     873       90202 :     amissummarizing = amRoutine->amsummarizing;
     874             : 
     875       90202 :     pfree(amRoutine);
     876       90202 :     ReleaseSysCache(tuple);
     877             : 
     878             :     /*
     879             :      * Validate predicate, if given
     880             :      */
     881       90202 :     if (stmt->whereClause)
     882         380 :         CheckPredicate((Expr *) stmt->whereClause);
     883             : 
     884             :     /*
     885             :      * Parse AM-specific options, convert to text array form, validate.
     886             :      */
     887       90202 :     reloptions = transformRelOptions((Datum) 0, stmt->options,
     888             :                                      NULL, NULL, false, false);
     889             : 
     890       90196 :     (void) index_reloptions(amoptions, reloptions, true);
     891             : 
     892             :     /*
     893             :      * Prepare arguments for index_create, primarily an IndexInfo structure.
     894             :      * Note that predicates must be in implicit-AND format.  In a concurrent
     895             :      * build, mark it not-ready-for-inserts.
     896             :      */
     897       90132 :     indexInfo = makeIndexInfo(numberOfAttributes,
     898             :                               numberOfKeyAttributes,
     899             :                               accessMethodId,
     900             :                               NIL,  /* expressions, NIL for now */
     901       90132 :                               make_ands_implicit((Expr *) stmt->whereClause),
     902       90132 :                               stmt->unique,
     903       90132 :                               stmt->nulls_not_distinct,
     904       90132 :                               !concurrent,
     905             :                               concurrent,
     906       90132 :                               amissummarizing);
     907             : 
     908       90132 :     typeObjectId = palloc_array(Oid, numberOfAttributes);
     909       90132 :     collationObjectId = palloc_array(Oid, numberOfAttributes);
     910       90132 :     classObjectId = palloc_array(Oid, numberOfAttributes);
     911       90132 :     coloptions = palloc_array(int16, numberOfAttributes);
     912       90132 :     ComputeIndexAttrs(indexInfo,
     913             :                       typeObjectId, collationObjectId, classObjectId,
     914             :                       coloptions, allIndexParams,
     915             :                       stmt->excludeOpNames, relationId,
     916             :                       accessMethodName, accessMethodId,
     917       90132 :                       amcanorder, stmt->isconstraint, root_save_userid,
     918             :                       root_save_sec_context, &root_save_nestlevel);
     919             : 
     920             :     /*
     921             :      * Extra checks when creating a PRIMARY KEY index.
     922             :      */
     923       90082 :     if (stmt->primary)
     924        7524 :         index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
     925             : 
     926             :     /*
     927             :      * If this table is partitioned and we're creating a unique index or a
     928             :      * primary key, make sure that the partition key is a subset of the
     929             :      * index's columns.  Otherwise it would be possible to violate uniqueness
     930             :      * by putting values that ought to be unique in different partitions.
     931             :      *
     932             :      * We could lift this limitation if we had global indexes, but those have
     933             :      * their own problems, so this is a useful feature combination.
     934             :      */
     935       90046 :     if (partitioned && (stmt->unique || stmt->primary))
     936             :     {
     937         968 :         PartitionKey key = RelationGetPartitionKey(rel);
     938             :         const char *constraint_type;
     939             :         int         i;
     940             : 
     941         968 :         if (stmt->primary)
     942         784 :             constraint_type = "PRIMARY KEY";
     943         184 :         else if (stmt->unique)
     944         184 :             constraint_type = "UNIQUE";
     945           0 :         else if (stmt->excludeOpNames != NIL)
     946           0 :             constraint_type = "EXCLUDE";
     947             :         else
     948             :         {
     949           0 :             elog(ERROR, "unknown constraint type");
     950             :             constraint_type = NULL; /* keep compiler quiet */
     951             :         }
     952             : 
     953             :         /*
     954             :          * Verify that all the columns in the partition key appear in the
     955             :          * unique key definition, with the same notion of equality.
     956             :          */
     957        1936 :         for (i = 0; i < key->partnatts; i++)
     958             :         {
     959        1034 :             bool        found = false;
     960             :             int         eq_strategy;
     961             :             Oid         ptkey_eqop;
     962             :             int         j;
     963             : 
     964             :             /*
     965             :              * Identify the equality operator associated with this partkey
     966             :              * column.  For list and range partitioning, partkeys use btree
     967             :              * operator classes; hash partitioning uses hash operator classes.
     968             :              * (Keep this in sync with ComputePartitionAttrs!)
     969             :              */
     970        1034 :             if (key->strategy == PARTITION_STRATEGY_HASH)
     971          46 :                 eq_strategy = HTEqualStrategyNumber;
     972             :             else
     973         988 :                 eq_strategy = BTEqualStrategyNumber;
     974             : 
     975        1034 :             ptkey_eqop = get_opfamily_member(key->partopfamily[i],
     976        1034 :                                              key->partopcintype[i],
     977        1034 :                                              key->partopcintype[i],
     978             :                                              eq_strategy);
     979        1034 :             if (!OidIsValid(ptkey_eqop))
     980           0 :                 elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
     981             :                      eq_strategy, key->partopcintype[i], key->partopcintype[i],
     982             :                      key->partopfamily[i]);
     983             : 
     984             :             /*
     985             :              * We'll need to be able to identify the equality operators
     986             :              * associated with index columns, too.  We know what to do with
     987             :              * btree opclasses; if there are ever any other index types that
     988             :              * support unique indexes, this logic will need extension.
     989             :              */
     990        1034 :             if (accessMethodId == BTREE_AM_OID)
     991        1034 :                 eq_strategy = BTEqualStrategyNumber;
     992             :             else
     993           0 :                 ereport(ERROR,
     994             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     995             :                          errmsg("cannot match partition key to an index using access method \"%s\"",
     996             :                                 accessMethodName)));
     997             : 
     998             :             /*
     999             :              * It may be possible to support UNIQUE constraints when partition
    1000             :              * keys are expressions, but is it worth it?  Give up for now.
    1001             :              */
    1002        1034 :             if (key->partattrs[i] == 0)
    1003          12 :                 ereport(ERROR,
    1004             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1005             :                          errmsg("unsupported %s constraint with partition key definition",
    1006             :                                 constraint_type),
    1007             :                          errdetail("%s constraints cannot be used when partition keys include expressions.",
    1008             :                                    constraint_type)));
    1009             : 
    1010             :             /* Search the index column(s) for a match */
    1011        1132 :             for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
    1012             :             {
    1013        1078 :                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
    1014             :                 {
    1015             :                     /* Matched the column, now what about the equality op? */
    1016             :                     Oid         idx_opfamily;
    1017             :                     Oid         idx_opcintype;
    1018             : 
    1019         968 :                     if (get_opclass_opfamily_and_input_type(classObjectId[j],
    1020             :                                                             &idx_opfamily,
    1021             :                                                             &idx_opcintype))
    1022             :                     {
    1023             :                         Oid         idx_eqop;
    1024             : 
    1025         968 :                         idx_eqop = get_opfamily_member(idx_opfamily,
    1026             :                                                        idx_opcintype,
    1027             :                                                        idx_opcintype,
    1028             :                                                        eq_strategy);
    1029         968 :                         if (ptkey_eqop == idx_eqop)
    1030             :                         {
    1031         968 :                             found = true;
    1032         968 :                             break;
    1033             :                         }
    1034             :                     }
    1035             :                 }
    1036             :             }
    1037             : 
    1038        1022 :             if (!found)
    1039             :             {
    1040             :                 Form_pg_attribute att;
    1041             : 
    1042          54 :                 att = TupleDescAttr(RelationGetDescr(rel),
    1043             :                                     key->partattrs[i] - 1);
    1044          54 :                 ereport(ERROR,
    1045             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1046             :                          errmsg("unique constraint on partitioned table must include all partitioning columns"),
    1047             :                          errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
    1048             :                                    constraint_type, RelationGetRelationName(rel),
    1049             :                                    NameStr(att->attname))));
    1050             :             }
    1051             :         }
    1052             :     }
    1053             : 
    1054             : 
    1055             :     /*
    1056             :      * We disallow indexes on system columns.  They would not necessarily get
    1057             :      * updated correctly, and they don't seem useful anyway.
    1058             :      */
    1059      232446 :     for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
    1060             :     {
    1061      142466 :         AttrNumber  attno = indexInfo->ii_IndexAttrNumbers[i];
    1062             : 
    1063      142466 :         if (attno < 0)
    1064           0 :             ereport(ERROR,
    1065             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1066             :                      errmsg("index creation on system columns is not supported")));
    1067             :     }
    1068             : 
    1069             :     /*
    1070             :      * Also check for system columns used in expressions or predicates.
    1071             :      */
    1072       89980 :     if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
    1073             :     {
    1074         966 :         Bitmapset  *indexattrs = NULL;
    1075             : 
    1076         966 :         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
    1077         966 :         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
    1078             : 
    1079        6750 :         for (int i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
    1080             :         {
    1081        5796 :             if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
    1082             :                               indexattrs))
    1083          12 :                 ereport(ERROR,
    1084             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1085             :                          errmsg("index creation on system columns is not supported")));
    1086             :         }
    1087             :     }
    1088             : 
    1089             :     /* Is index safe for others to ignore?  See set_indexsafe_procflags() */
    1090      179256 :     safe_index = indexInfo->ii_Expressions == NIL &&
    1091       89288 :         indexInfo->ii_Predicate == NIL;
    1092             : 
    1093             :     /*
    1094             :      * Report index creation if appropriate (delay this till after most of the
    1095             :      * error checks)
    1096             :      */
    1097       89968 :     if (stmt->isconstraint && !quiet)
    1098             :     {
    1099             :         const char *constraint_type;
    1100             : 
    1101        8180 :         if (stmt->primary)
    1102        7350 :             constraint_type = "PRIMARY KEY";
    1103         830 :         else if (stmt->unique)
    1104         726 :             constraint_type = "UNIQUE";
    1105         104 :         else if (stmt->excludeOpNames != NIL)
    1106         104 :             constraint_type = "EXCLUDE";
    1107             :         else
    1108             :         {
    1109           0 :             elog(ERROR, "unknown constraint type");
    1110             :             constraint_type = NULL; /* keep compiler quiet */
    1111             :         }
    1112             : 
    1113        8180 :         ereport(DEBUG1,
    1114             :                 (errmsg_internal("%s %s will create implicit index \"%s\" for table \"%s\"",
    1115             :                                  is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
    1116             :                                  constraint_type,
    1117             :                                  indexRelationName, RelationGetRelationName(rel))));
    1118             :     }
    1119             : 
    1120             :     /*
    1121             :      * A valid stmt->oldNumber implies that we already have a built form of
    1122             :      * the index.  The caller should also decline any index build.
    1123             :      */
    1124             :     Assert(!RelFileNumberIsValid(stmt->oldNumber) || (skip_build && !concurrent));
    1125             : 
    1126             :     /*
    1127             :      * Make the catalog entries for the index, including constraints. This
    1128             :      * step also actually builds the index, except if caller requested not to
    1129             :      * or in concurrent mode, in which case it'll be done later, or doing a
    1130             :      * partitioned index (because those don't have storage).
    1131             :      */
    1132       89968 :     flags = constr_flags = 0;
    1133       89968 :     if (stmt->isconstraint)
    1134        8396 :         flags |= INDEX_CREATE_ADD_CONSTRAINT;
    1135       89968 :     if (skip_build || concurrent || partitioned)
    1136       77312 :         flags |= INDEX_CREATE_SKIP_BUILD;
    1137       89968 :     if (stmt->if_not_exists)
    1138          18 :         flags |= INDEX_CREATE_IF_NOT_EXISTS;
    1139       89968 :     if (concurrent)
    1140         146 :         flags |= INDEX_CREATE_CONCURRENT;
    1141       89968 :     if (partitioned)
    1142        1630 :         flags |= INDEX_CREATE_PARTITIONED;
    1143       89968 :     if (stmt->primary)
    1144        7458 :         flags |= INDEX_CREATE_IS_PRIMARY;
    1145             : 
    1146             :     /*
    1147             :      * If the table is partitioned, and recursion was declined but partitions
    1148             :      * exist, mark the index as invalid.
    1149             :      */
    1150       89968 :     if (partitioned && stmt->relation && !stmt->relation->inh)
    1151             :     {
    1152         210 :         PartitionDesc pd = RelationGetPartitionDesc(rel, true);
    1153             : 
    1154         210 :         if (pd->nparts != 0)
    1155         196 :             flags |= INDEX_CREATE_INVALID;
    1156             :     }
    1157             : 
    1158       89968 :     if (stmt->deferrable)
    1159          80 :         constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
    1160       89968 :     if (stmt->initdeferred)
    1161          14 :         constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
    1162             : 
    1163             :     indexRelationId =
    1164       89968 :         index_create(rel, indexRelationName, indexRelationId, parentIndexId,
    1165             :                      parentConstraintId,
    1166             :                      stmt->oldNumber, indexInfo, indexColNames,
    1167             :                      accessMethodId, tablespaceId,
    1168             :                      collationObjectId, classObjectId,
    1169             :                      coloptions, reloptions,
    1170             :                      flags, constr_flags,
    1171       89968 :                      allowSystemTableMods, !check_rights,
    1172       89968 :                      &createdConstraintId);
    1173             : 
    1174       89784 :     ObjectAddressSet(address, RelationRelationId, indexRelationId);
    1175             : 
    1176       89784 :     if (!OidIsValid(indexRelationId))
    1177             :     {
    1178             :         /*
    1179             :          * Roll back any GUC changes executed by index functions.  Also revert
    1180             :          * to original default_tablespace if we changed it above.
    1181             :          */
    1182          18 :         AtEOXact_GUC(false, root_save_nestlevel);
    1183             : 
    1184             :         /* Restore userid and security context */
    1185          18 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1186             : 
    1187          18 :         table_close(rel, NoLock);
    1188             : 
    1189             :         /* If this is the top-level index, we're done */
    1190          18 :         if (!OidIsValid(parentIndexId))
    1191          18 :             pgstat_progress_end_command();
    1192             : 
    1193          18 :         return address;
    1194             :     }
    1195             : 
    1196             :     /*
    1197             :      * Roll back any GUC changes executed by index functions, and keep
    1198             :      * subsequent changes local to this command.  This is essential if some
    1199             :      * index function changed a behavior-affecting GUC, e.g. search_path.
    1200             :      */
    1201       89766 :     AtEOXact_GUC(false, root_save_nestlevel);
    1202       89766 :     root_save_nestlevel = NewGUCNestLevel();
    1203             : 
    1204             :     /* Add any requested comment */
    1205       89766 :     if (stmt->idxcomment != NULL)
    1206          90 :         CreateComments(indexRelationId, RelationRelationId, 0,
    1207          90 :                        stmt->idxcomment);
    1208             : 
    1209       89766 :     if (partitioned)
    1210             :     {
    1211             :         PartitionDesc partdesc;
    1212             : 
    1213             :         /*
    1214             :          * Unless caller specified to skip this step (via ONLY), process each
    1215             :          * partition to make sure they all contain a corresponding index.
    1216             :          *
    1217             :          * If we're called internally (no stmt->relation), recurse always.
    1218             :          */
    1219        1630 :         partdesc = RelationGetPartitionDesc(rel, true);
    1220        1630 :         if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
    1221             :         {
    1222         478 :             int         nparts = partdesc->nparts;
    1223         478 :             Oid        *part_oids = palloc_array(Oid, nparts);
    1224         478 :             bool        invalidate_parent = false;
    1225             :             Relation    parentIndex;
    1226             :             TupleDesc   parentDesc;
    1227             : 
    1228             :             /*
    1229             :              * Report the total number of partitions at the start of the
    1230             :              * command; don't update it when being called recursively.
    1231             :              */
    1232         478 :             if (!OidIsValid(parentIndexId))
    1233             :             {
    1234             :                 /*
    1235             :                  * When called by ProcessUtilitySlow, the number of partitions
    1236             :                  * is passed in as an optimization; but other callers pass -1
    1237             :                  * since they don't have the value handy.  This should count
    1238             :                  * partitions the same way, ie one less than the number of
    1239             :                  * relations find_all_inheritors reports.
    1240             :                  *
    1241             :                  * We assume we needn't ask find_all_inheritors to take locks,
    1242             :                  * because that should have happened already for all callers.
    1243             :                  * Even if it did not, this is safe as long as we don't try to
    1244             :                  * touch the partitions here; the worst consequence would be a
    1245             :                  * bogus progress-reporting total.
    1246             :                  */
    1247         390 :                 if (total_parts < 0)
    1248             :                 {
    1249         122 :                     List       *children = find_all_inheritors(relationId,
    1250             :                                                                NoLock, NULL);
    1251             : 
    1252         122 :                     total_parts = list_length(children) - 1;
    1253         122 :                     list_free(children);
    1254             :                 }
    1255             : 
    1256         390 :                 pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
    1257             :                                              total_parts);
    1258             :             }
    1259             : 
    1260             :             /* Make a local copy of partdesc->oids[], just for safety */
    1261         478 :             memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
    1262             : 
    1263             :             /*
    1264             :              * We'll need an IndexInfo describing the parent index.  The one
    1265             :              * built above is almost good enough, but not quite, because (for
    1266             :              * example) its predicate expression if any hasn't been through
    1267             :              * expression preprocessing.  The most reliable way to get an
    1268             :              * IndexInfo that will match those for child indexes is to build
    1269             :              * it the same way, using BuildIndexInfo().
    1270             :              */
    1271         478 :             parentIndex = index_open(indexRelationId, lockmode);
    1272         478 :             indexInfo = BuildIndexInfo(parentIndex);
    1273             : 
    1274         478 :             parentDesc = RelationGetDescr(rel);
    1275             : 
    1276             :             /*
    1277             :              * For each partition, scan all existing indexes; if one matches
    1278             :              * our index definition and is not already attached to some other
    1279             :              * parent index, attach it to the one we just created.
    1280             :              *
    1281             :              * If none matches, build a new index by calling ourselves
    1282             :              * recursively with the same options (except for the index name).
    1283             :              */
    1284        1254 :             for (int i = 0; i < nparts; i++)
    1285             :             {
    1286         800 :                 Oid         childRelid = part_oids[i];
    1287             :                 Relation    childrel;
    1288             :                 Oid         child_save_userid;
    1289             :                 int         child_save_sec_context;
    1290             :                 int         child_save_nestlevel;
    1291             :                 List       *childidxs;
    1292             :                 ListCell   *cell;
    1293             :                 AttrMap    *attmap;
    1294         800 :                 bool        found = false;
    1295             : 
    1296         800 :                 childrel = table_open(childRelid, lockmode);
    1297             : 
    1298         800 :                 GetUserIdAndSecContext(&child_save_userid,
    1299             :                                        &child_save_sec_context);
    1300         800 :                 SetUserIdAndSecContext(childrel->rd_rel->relowner,
    1301             :                                        child_save_sec_context | SECURITY_RESTRICTED_OPERATION);
    1302         800 :                 child_save_nestlevel = NewGUCNestLevel();
    1303             : 
    1304             :                 /*
    1305             :                  * Don't try to create indexes on foreign tables, though. Skip
    1306             :                  * those if a regular index, or fail if trying to create a
    1307             :                  * constraint index.
    1308             :                  */
    1309         800 :                 if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
    1310             :                 {
    1311          18 :                     if (stmt->unique || stmt->primary)
    1312          12 :                         ereport(ERROR,
    1313             :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1314             :                                  errmsg("cannot create unique index on partitioned table \"%s\"",
    1315             :                                         RelationGetRelationName(rel)),
    1316             :                                  errdetail("Table \"%s\" contains partitions that are foreign tables.",
    1317             :                                            RelationGetRelationName(rel))));
    1318             : 
    1319           6 :                     AtEOXact_GUC(false, child_save_nestlevel);
    1320           6 :                     SetUserIdAndSecContext(child_save_userid,
    1321             :                                            child_save_sec_context);
    1322           6 :                     table_close(childrel, lockmode);
    1323           6 :                     continue;
    1324             :                 }
    1325             : 
    1326         782 :                 childidxs = RelationGetIndexList(childrel);
    1327             :                 attmap =
    1328         782 :                     build_attrmap_by_name(RelationGetDescr(childrel),
    1329             :                                           parentDesc,
    1330             :                                           false);
    1331             : 
    1332        1106 :                 foreach(cell, childidxs)
    1333             :                 {
    1334         378 :                     Oid         cldidxid = lfirst_oid(cell);
    1335             :                     Relation    cldidx;
    1336             :                     IndexInfo  *cldIdxInfo;
    1337             : 
    1338             :                     /* this index is already partition of another one */
    1339         378 :                     if (has_superclass(cldidxid))
    1340         300 :                         continue;
    1341             : 
    1342          78 :                     cldidx = index_open(cldidxid, lockmode);
    1343          78 :                     cldIdxInfo = BuildIndexInfo(cldidx);
    1344          78 :                     if (CompareIndexInfo(cldIdxInfo, indexInfo,
    1345             :                                          cldidx->rd_indcollation,
    1346             :                                          parentIndex->rd_indcollation,
    1347             :                                          cldidx->rd_opfamily,
    1348             :                                          parentIndex->rd_opfamily,
    1349             :                                          attmap))
    1350             :                     {
    1351          54 :                         Oid         cldConstrOid = InvalidOid;
    1352             : 
    1353             :                         /*
    1354             :                          * Found a match.
    1355             :                          *
    1356             :                          * If this index is being created in the parent
    1357             :                          * because of a constraint, then the child needs to
    1358             :                          * have a constraint also, so look for one.  If there
    1359             :                          * is no such constraint, this index is no good, so
    1360             :                          * keep looking.
    1361             :                          */
    1362          54 :                         if (createdConstraintId != InvalidOid)
    1363             :                         {
    1364             :                             cldConstrOid =
    1365          12 :                                 get_relation_idx_constraint_oid(childRelid,
    1366             :                                                                 cldidxid);
    1367          12 :                             if (cldConstrOid == InvalidOid)
    1368             :                             {
    1369           0 :                                 index_close(cldidx, lockmode);
    1370           0 :                                 continue;
    1371             :                             }
    1372             :                         }
    1373             : 
    1374             :                         /* Attach index to parent and we're done. */
    1375          54 :                         IndexSetParentIndex(cldidx, indexRelationId);
    1376          54 :                         if (createdConstraintId != InvalidOid)
    1377          12 :                             ConstraintSetParentConstraint(cldConstrOid,
    1378             :                                                           createdConstraintId,
    1379             :                                                           childRelid);
    1380             : 
    1381          54 :                         if (!cldidx->rd_index->indisvalid)
    1382          12 :                             invalidate_parent = true;
    1383             : 
    1384          54 :                         found = true;
    1385             : 
    1386             :                         /*
    1387             :                          * Report this partition as processed.  Note that if
    1388             :                          * the partition has children itself, we'd ideally
    1389             :                          * count the children and update the progress report
    1390             :                          * for all of them; but that seems unduly expensive.
    1391             :                          * Instead, the progress report will act like all such
    1392             :                          * indirect children were processed in zero time at
    1393             :                          * the end of the command.
    1394             :                          */
    1395          54 :                         pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1396             : 
    1397             :                         /* keep lock till commit */
    1398          54 :                         index_close(cldidx, NoLock);
    1399          54 :                         break;
    1400             :                     }
    1401             : 
    1402          24 :                     index_close(cldidx, lockmode);
    1403             :                 }
    1404             : 
    1405         782 :                 list_free(childidxs);
    1406         782 :                 AtEOXact_GUC(false, child_save_nestlevel);
    1407         782 :                 SetUserIdAndSecContext(child_save_userid,
    1408             :                                        child_save_sec_context);
    1409         782 :                 table_close(childrel, NoLock);
    1410             : 
    1411             :                 /*
    1412             :                  * If no matching index was found, create our own.
    1413             :                  */
    1414         782 :                 if (!found)
    1415             :                 {
    1416         728 :                     IndexStmt  *childStmt = copyObject(stmt);
    1417             :                     bool        found_whole_row;
    1418             :                     ListCell   *lc;
    1419             : 
    1420             :                     /*
    1421             :                      * We can't use the same index name for the child index,
    1422             :                      * so clear idxname to let the recursive invocation choose
    1423             :                      * a new name.  Likewise, the existing target relation
    1424             :                      * field is wrong, and if indexOid or oldNumber are set,
    1425             :                      * they mustn't be applied to the child either.
    1426             :                      */
    1427         728 :                     childStmt->idxname = NULL;
    1428         728 :                     childStmt->relation = NULL;
    1429         728 :                     childStmt->indexOid = InvalidOid;
    1430         728 :                     childStmt->oldNumber = InvalidRelFileNumber;
    1431         728 :                     childStmt->oldCreateSubid = InvalidSubTransactionId;
    1432         728 :                     childStmt->oldFirstRelfilelocatorSubid = InvalidSubTransactionId;
    1433             : 
    1434             :                     /*
    1435             :                      * Adjust any Vars (both in expressions and in the index's
    1436             :                      * WHERE clause) to match the partition's column numbering
    1437             :                      * in case it's different from the parent's.
    1438             :                      */
    1439        1650 :                     foreach(lc, childStmt->indexParams)
    1440             :                     {
    1441         922 :                         IndexElem  *ielem = lfirst(lc);
    1442             : 
    1443             :                         /*
    1444             :                          * If the index parameter is an expression, we must
    1445             :                          * translate it to contain child Vars.
    1446             :                          */
    1447         922 :                         if (ielem->expr)
    1448             :                         {
    1449          66 :                             ielem->expr =
    1450          66 :                                 map_variable_attnos((Node *) ielem->expr,
    1451             :                                                     1, 0, attmap,
    1452             :                                                     InvalidOid,
    1453             :                                                     &found_whole_row);
    1454          66 :                             if (found_whole_row)
    1455           0 :                                 elog(ERROR, "cannot convert whole-row table reference");
    1456             :                         }
    1457             :                     }
    1458         728 :                     childStmt->whereClause =
    1459         728 :                         map_variable_attnos(stmt->whereClause, 1, 0,
    1460             :                                             attmap,
    1461             :                                             InvalidOid, &found_whole_row);
    1462         728 :                     if (found_whole_row)
    1463           0 :                         elog(ERROR, "cannot convert whole-row table reference");
    1464             : 
    1465             :                     /*
    1466             :                      * Recurse as the starting user ID.  Callee will use that
    1467             :                      * for permission checks, then switch again.
    1468             :                      */
    1469             :                     Assert(GetUserId() == child_save_userid);
    1470         728 :                     SetUserIdAndSecContext(root_save_userid,
    1471             :                                            root_save_sec_context);
    1472         728 :                     DefineIndex(childRelid, childStmt,
    1473             :                                 InvalidOid, /* no predefined OID */
    1474             :                                 indexRelationId,    /* this is our child */
    1475             :                                 createdConstraintId,
    1476             :                                 -1,
    1477             :                                 is_alter_table, check_rights, check_not_in_use,
    1478             :                                 skip_build, quiet);
    1479         716 :                     SetUserIdAndSecContext(child_save_userid,
    1480             :                                            child_save_sec_context);
    1481             :                 }
    1482             : 
    1483         770 :                 free_attrmap(attmap);
    1484             :             }
    1485             : 
    1486         454 :             index_close(parentIndex, lockmode);
    1487             : 
    1488             :             /*
    1489             :              * The pg_index row we inserted for this index was marked
    1490             :              * indisvalid=true.  But if we attached an existing index that is
    1491             :              * invalid, this is incorrect, so update our row to invalid too.
    1492             :              */
    1493         454 :             if (invalidate_parent)
    1494             :             {
    1495          12 :                 Relation    pg_index = table_open(IndexRelationId, RowExclusiveLock);
    1496             :                 HeapTuple   tup,
    1497             :                             newtup;
    1498             : 
    1499          12 :                 tup = SearchSysCache1(INDEXRELID,
    1500             :                                       ObjectIdGetDatum(indexRelationId));
    1501          12 :                 if (!HeapTupleIsValid(tup))
    1502           0 :                     elog(ERROR, "cache lookup failed for index %u",
    1503             :                          indexRelationId);
    1504          12 :                 newtup = heap_copytuple(tup);
    1505          12 :                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
    1506          12 :                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
    1507          12 :                 ReleaseSysCache(tup);
    1508          12 :                 table_close(pg_index, RowExclusiveLock);
    1509          12 :                 heap_freetuple(newtup);
    1510             :             }
    1511             :         }
    1512             : 
    1513             :         /*
    1514             :          * Indexes on partitioned tables are not themselves built, so we're
    1515             :          * done here.
    1516             :          */
    1517        1606 :         AtEOXact_GUC(false, root_save_nestlevel);
    1518        1606 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1519        1606 :         table_close(rel, NoLock);
    1520        1606 :         if (!OidIsValid(parentIndexId))
    1521        1366 :             pgstat_progress_end_command();
    1522             :         else
    1523             :         {
    1524             :             /* Update progress for an intermediate partitioned index itself */
    1525         240 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1526             :         }
    1527             : 
    1528        1606 :         return address;
    1529             :     }
    1530             : 
    1531       88136 :     AtEOXact_GUC(false, root_save_nestlevel);
    1532       88136 :     SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1533             : 
    1534       88136 :     if (!concurrent)
    1535             :     {
    1536             :         /* Close the heap and we're done, in the non-concurrent case */
    1537       88002 :         table_close(rel, NoLock);
    1538             : 
    1539             :         /*
    1540             :          * If this is the top-level index, the command is done overall;
    1541             :          * otherwise, increment progress to report one child index is done.
    1542             :          */
    1543       88002 :         if (!OidIsValid(parentIndexId))
    1544       86144 :             pgstat_progress_end_command();
    1545             :         else
    1546        1858 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1547             : 
    1548       88002 :         return address;
    1549             :     }
    1550             : 
    1551             :     /* save lockrelid and locktag for below, then close rel */
    1552         134 :     heaprelid = rel->rd_lockInfo.lockRelId;
    1553         134 :     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
    1554         134 :     table_close(rel, NoLock);
    1555             : 
    1556             :     /*
    1557             :      * For a concurrent build, it's important to make the catalog entries
    1558             :      * visible to other transactions before we start to build the index. That
    1559             :      * will prevent them from making incompatible HOT updates.  The new index
    1560             :      * will be marked not indisready and not indisvalid, so that no one else
    1561             :      * tries to either insert into it or use it for queries.
    1562             :      *
    1563             :      * We must commit our current transaction so that the index becomes
    1564             :      * visible; then start another.  Note that all the data structures we just
    1565             :      * built are lost in the commit.  The only data we keep past here are the
    1566             :      * relation IDs.
    1567             :      *
    1568             :      * Before committing, get a session-level lock on the table, to ensure
    1569             :      * that neither it nor the index can be dropped before we finish. This
    1570             :      * cannot block, even if someone else is waiting for access, because we
    1571             :      * already have the same lock within our transaction.
    1572             :      *
    1573             :      * Note: we don't currently bother with a session lock on the index,
    1574             :      * because there are no operations that could change its state while we
    1575             :      * hold lock on the parent table.  This might need to change later.
    1576             :      */
    1577         134 :     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1578             : 
    1579         134 :     PopActiveSnapshot();
    1580         134 :     CommitTransactionCommand();
    1581         134 :     StartTransactionCommand();
    1582             : 
    1583             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1584         134 :     if (safe_index)
    1585         100 :         set_indexsafe_procflags();
    1586             : 
    1587             :     /*
    1588             :      * The index is now visible, so we can report the OID.  While on it,
    1589             :      * include the report for the beginning of phase 2.
    1590             :      */
    1591             :     {
    1592         134 :         const int   progress_cols[] = {
    1593             :             PROGRESS_CREATEIDX_INDEX_OID,
    1594             :             PROGRESS_CREATEIDX_PHASE
    1595             :         };
    1596         134 :         const int64 progress_vals[] = {
    1597             :             indexRelationId,
    1598             :             PROGRESS_CREATEIDX_PHASE_WAIT_1
    1599             :         };
    1600             : 
    1601         134 :         pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
    1602             :     }
    1603             : 
    1604             :     /*
    1605             :      * Phase 2 of concurrent index build (see comments for validate_index()
    1606             :      * for an overview of how this works)
    1607             :      *
    1608             :      * Now we must wait until no running transaction could have the table open
    1609             :      * with the old list of indexes.  Use ShareLock to consider running
    1610             :      * transactions that hold locks that permit writing to the table.  Note we
    1611             :      * do not need to worry about xacts that open the table for writing after
    1612             :      * this point; they will see the new index when they open it.
    1613             :      *
    1614             :      * Note: the reason we use actual lock acquisition here, rather than just
    1615             :      * checking the ProcArray and sleeping, is that deadlock is possible if
    1616             :      * one of the transactions in question is blocked trying to acquire an
    1617             :      * exclusive lock on our table.  The lock code will detect deadlock and
    1618             :      * error out properly.
    1619             :      */
    1620         134 :     WaitForLockers(heaplocktag, ShareLock, true);
    1621             : 
    1622             :     /*
    1623             :      * At this moment we are sure that there are no transactions with the
    1624             :      * table open for write that don't have this new index in their list of
    1625             :      * indexes.  We have waited out all the existing transactions and any new
    1626             :      * transaction will have the new index in its list, but the index is still
    1627             :      * marked as "not-ready-for-inserts".  The index is consulted while
    1628             :      * deciding HOT-safety though.  This arrangement ensures that no new HOT
    1629             :      * chains can be created where the new tuple and the old tuple in the
    1630             :      * chain have different index keys.
    1631             :      *
    1632             :      * We now take a new snapshot, and build the index using all tuples that
    1633             :      * are visible in this snapshot.  We can be sure that any HOT updates to
    1634             :      * these tuples will be compatible with the index, since any updates made
    1635             :      * by transactions that didn't know about the index are now committed or
    1636             :      * rolled back.  Thus, each visible tuple is either the end of its
    1637             :      * HOT-chain or the extension of the chain is HOT-safe for this index.
    1638             :      */
    1639             : 
    1640             :     /* Set ActiveSnapshot since functions in the indexes may need it */
    1641         134 :     PushActiveSnapshot(GetTransactionSnapshot());
    1642             : 
    1643             :     /* Perform concurrent build of index */
    1644         134 :     index_concurrently_build(relationId, indexRelationId);
    1645             : 
    1646             :     /* we can do away with our snapshot */
    1647         122 :     PopActiveSnapshot();
    1648             : 
    1649             :     /*
    1650             :      * Commit this transaction to make the indisready update visible.
    1651             :      */
    1652         122 :     CommitTransactionCommand();
    1653         122 :     StartTransactionCommand();
    1654             : 
    1655             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1656         122 :     if (safe_index)
    1657          88 :         set_indexsafe_procflags();
    1658             : 
    1659             :     /*
    1660             :      * Phase 3 of concurrent index build
    1661             :      *
    1662             :      * We once again wait until no transaction can have the table open with
    1663             :      * the index marked as read-only for updates.
    1664             :      */
    1665         122 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1666             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    1667         122 :     WaitForLockers(heaplocktag, ShareLock, true);
    1668             : 
    1669             :     /*
    1670             :      * Now take the "reference snapshot" that will be used by validate_index()
    1671             :      * to filter candidate tuples.  Beware!  There might still be snapshots in
    1672             :      * use that treat some transaction as in-progress that our reference
    1673             :      * snapshot treats as committed.  If such a recently-committed transaction
    1674             :      * deleted tuples in the table, we will not include them in the index; yet
    1675             :      * those transactions which see the deleting one as still-in-progress will
    1676             :      * expect such tuples to be there once we mark the index as valid.
    1677             :      *
    1678             :      * We solve this by waiting for all endangered transactions to exit before
    1679             :      * we mark the index as valid.
    1680             :      *
    1681             :      * We also set ActiveSnapshot to this snap, since functions in indexes may
    1682             :      * need a snapshot.
    1683             :      */
    1684         122 :     snapshot = RegisterSnapshot(GetTransactionSnapshot());
    1685         122 :     PushActiveSnapshot(snapshot);
    1686             : 
    1687             :     /*
    1688             :      * Scan the index and the heap, insert any missing index entries.
    1689             :      */
    1690         122 :     validate_index(relationId, indexRelationId, snapshot);
    1691             : 
    1692             :     /*
    1693             :      * Drop the reference snapshot.  We must do this before waiting out other
    1694             :      * snapshot holders, else we will deadlock against other processes also
    1695             :      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
    1696             :      * they must wait for.  But first, save the snapshot's xmin to use as
    1697             :      * limitXmin for GetCurrentVirtualXIDs().
    1698             :      */
    1699         122 :     limitXmin = snapshot->xmin;
    1700             : 
    1701         122 :     PopActiveSnapshot();
    1702         122 :     UnregisterSnapshot(snapshot);
    1703             : 
    1704             :     /*
    1705             :      * The snapshot subsystem could still contain registered snapshots that
    1706             :      * are holding back our process's advertised xmin; in particular, if
    1707             :      * default_transaction_isolation = serializable, there is a transaction
    1708             :      * snapshot that is still active.  The CatalogSnapshot is likewise a
    1709             :      * hazard.  To ensure no deadlocks, we must commit and start yet another
    1710             :      * transaction, and do our wait before any snapshot has been taken in it.
    1711             :      */
    1712         122 :     CommitTransactionCommand();
    1713         122 :     StartTransactionCommand();
    1714             : 
    1715             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1716         122 :     if (safe_index)
    1717          88 :         set_indexsafe_procflags();
    1718             : 
    1719             :     /* We should now definitely not be advertising any xmin. */
    1720             :     Assert(MyProc->xmin == InvalidTransactionId);
    1721             : 
    1722             :     /*
    1723             :      * The index is now valid in the sense that it contains all currently
    1724             :      * interesting tuples.  But since it might not contain tuples deleted just
    1725             :      * before the reference snap was taken, we have to wait out any
    1726             :      * transactions that might have older snapshots.
    1727             :      */
    1728         122 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1729             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_3);
    1730         122 :     WaitForOlderSnapshots(limitXmin, true);
    1731             : 
    1732             :     /*
    1733             :      * Index can now be marked valid -- update its pg_index entry
    1734             :      */
    1735         122 :     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
    1736             : 
    1737             :     /*
    1738             :      * The pg_index update will cause backends (including this one) to update
    1739             :      * relcache entries for the index itself, but we should also send a
    1740             :      * relcache inval on the parent table to force replanning of cached plans.
    1741             :      * Otherwise existing sessions might fail to use the new index where it
    1742             :      * would be useful.  (Note that our earlier commits did not create reasons
    1743             :      * to replan; so relcache flush on the index itself was sufficient.)
    1744             :      */
    1745         122 :     CacheInvalidateRelcacheByRelid(heaprelid.relId);
    1746             : 
    1747             :     /*
    1748             :      * Last thing to do is release the session-level lock on the parent table.
    1749             :      */
    1750         122 :     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1751             : 
    1752         122 :     pgstat_progress_end_command();
    1753             : 
    1754         122 :     return address;
    1755             : }
    1756             : 
    1757             : 
    1758             : /*
    1759             :  * CheckMutability
    1760             :  *      Test whether given expression is mutable
    1761             :  */
    1762             : static bool
    1763        1094 : CheckMutability(Expr *expr)
    1764             : {
    1765             :     /*
    1766             :      * First run the expression through the planner.  This has a couple of
    1767             :      * important consequences.  First, function default arguments will get
    1768             :      * inserted, which may affect volatility (consider "default now()").
    1769             :      * Second, inline-able functions will get inlined, which may allow us to
    1770             :      * conclude that the function is really less volatile than it's marked. As
    1771             :      * an example, polymorphic functions must be marked with the most volatile
    1772             :      * behavior that they have for any input type, but once we inline the
    1773             :      * function we may be able to conclude that it's not so volatile for the
    1774             :      * particular input type we're dealing with.
    1775             :      *
    1776             :      * We assume here that expression_planner() won't scribble on its input.
    1777             :      */
    1778        1094 :     expr = expression_planner(expr);
    1779             : 
    1780             :     /* Now we can search for non-immutable functions */
    1781        1094 :     return contain_mutable_functions((Node *) expr);
    1782             : }
    1783             : 
    1784             : 
    1785             : /*
    1786             :  * CheckPredicate
    1787             :  *      Checks that the given partial-index predicate is valid.
    1788             :  *
    1789             :  * This used to also constrain the form of the predicate to forms that
    1790             :  * indxpath.c could do something with.  However, that seems overly
    1791             :  * restrictive.  One useful application of partial indexes is to apply
    1792             :  * a UNIQUE constraint across a subset of a table, and in that scenario
    1793             :  * any evaluable predicate will work.  So accept any predicate here
    1794             :  * (except ones requiring a plan), and let indxpath.c fend for itself.
    1795             :  */
    1796             : static void
    1797         380 : CheckPredicate(Expr *predicate)
    1798             : {
    1799             :     /*
    1800             :      * transformExpr() should have already rejected subqueries, aggregates,
    1801             :      * and window functions, based on the EXPR_KIND_ for a predicate.
    1802             :      */
    1803             : 
    1804             :     /*
    1805             :      * A predicate using mutable functions is probably wrong, for the same
    1806             :      * reasons that we don't allow an index expression to use one.
    1807             :      */
    1808         380 :     if (CheckMutability(predicate))
    1809           0 :         ereport(ERROR,
    1810             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1811             :                  errmsg("functions in index predicate must be marked IMMUTABLE")));
    1812         380 : }
    1813             : 
    1814             : /*
    1815             :  * Compute per-index-column information, including indexed column numbers
    1816             :  * or index expressions, opclasses and their options. Note, all output vectors
    1817             :  * should be allocated for all columns, including "including" ones.
    1818             :  *
    1819             :  * If the caller switched to the table owner, ddl_userid is the role for ACL
    1820             :  * checks reached without traversing opaque expressions.  Otherwise, it's
    1821             :  * InvalidOid, and other ddl_* arguments are undefined.
    1822             :  */
    1823             : static void
    1824       90234 : ComputeIndexAttrs(IndexInfo *indexInfo,
    1825             :                   Oid *typeOidP,
    1826             :                   Oid *collationOidP,
    1827             :                   Oid *classOidP,
    1828             :                   int16 *colOptionP,
    1829             :                   List *attList,    /* list of IndexElem's */
    1830             :                   List *exclusionOpNames,
    1831             :                   Oid relId,
    1832             :                   const char *accessMethodName,
    1833             :                   Oid accessMethodId,
    1834             :                   bool amcanorder,
    1835             :                   bool isconstraint,
    1836             :                   Oid ddl_userid,
    1837             :                   int ddl_sec_context,
    1838             :                   int *ddl_save_nestlevel)
    1839             : {
    1840             :     ListCell   *nextExclOp;
    1841             :     ListCell   *lc;
    1842             :     int         attn;
    1843       90234 :     int         nkeycols = indexInfo->ii_NumIndexKeyAttrs;
    1844             :     Oid         save_userid;
    1845             :     int         save_sec_context;
    1846             : 
    1847             :     /* Allocate space for exclusion operator info, if needed */
    1848       90234 :     if (exclusionOpNames)
    1849             :     {
    1850             :         Assert(list_length(exclusionOpNames) == nkeycols);
    1851         130 :         indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1852         130 :         indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1853         130 :         indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1854         130 :         nextExclOp = list_head(exclusionOpNames);
    1855             :     }
    1856             :     else
    1857       90104 :         nextExclOp = NULL;
    1858             : 
    1859       90234 :     if (OidIsValid(ddl_userid))
    1860       90132 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    1861             : 
    1862             :     /*
    1863             :      * process attributeList
    1864             :      */
    1865       90234 :     attn = 0;
    1866      232916 :     foreach(lc, attList)
    1867             :     {
    1868      142732 :         IndexElem  *attribute = (IndexElem *) lfirst(lc);
    1869             :         Oid         atttype;
    1870             :         Oid         attcollation;
    1871             : 
    1872             :         /*
    1873             :          * Process the column-or-expression to be indexed.
    1874             :          */
    1875      142732 :         if (attribute->name != NULL)
    1876             :         {
    1877             :             /* Simple index attribute */
    1878             :             HeapTuple   atttuple;
    1879             :             Form_pg_attribute attform;
    1880             : 
    1881             :             Assert(attribute->expr == NULL);
    1882      142006 :             atttuple = SearchSysCacheAttName(relId, attribute->name);
    1883      142006 :             if (!HeapTupleIsValid(atttuple))
    1884             :             {
    1885             :                 /* difference in error message spellings is historical */
    1886          30 :                 if (isconstraint)
    1887          18 :                     ereport(ERROR,
    1888             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1889             :                              errmsg("column \"%s\" named in key does not exist",
    1890             :                                     attribute->name)));
    1891             :                 else
    1892          12 :                     ereport(ERROR,
    1893             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1894             :                              errmsg("column \"%s\" does not exist",
    1895             :                                     attribute->name)));
    1896             :             }
    1897      141976 :             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
    1898      141976 :             indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
    1899      141976 :             atttype = attform->atttypid;
    1900      141976 :             attcollation = attform->attcollation;
    1901      141976 :             ReleaseSysCache(atttuple);
    1902             :         }
    1903             :         else
    1904             :         {
    1905             :             /* Index expression */
    1906         726 :             Node       *expr = attribute->expr;
    1907             : 
    1908             :             Assert(expr != NULL);
    1909             : 
    1910         726 :             if (attn >= nkeycols)
    1911           0 :                 ereport(ERROR,
    1912             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1913             :                          errmsg("expressions are not supported in included columns")));
    1914         726 :             atttype = exprType(expr);
    1915         726 :             attcollation = exprCollation(expr);
    1916             : 
    1917             :             /*
    1918             :              * Strip any top-level COLLATE clause.  This ensures that we treat
    1919             :              * "x COLLATE y" and "(x COLLATE y)" alike.
    1920             :              */
    1921         756 :             while (IsA(expr, CollateExpr))
    1922          30 :                 expr = (Node *) ((CollateExpr *) expr)->arg;
    1923             : 
    1924         726 :             if (IsA(expr, Var) &&
    1925          12 :                 ((Var *) expr)->varattno != InvalidAttrNumber)
    1926             :             {
    1927             :                 /*
    1928             :                  * User wrote "(column)" or "(column COLLATE something)".
    1929             :                  * Treat it like simple attribute anyway.
    1930             :                  */
    1931          12 :                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
    1932             :             }
    1933             :             else
    1934             :             {
    1935         714 :                 indexInfo->ii_IndexAttrNumbers[attn] = 0;    /* marks expression */
    1936         714 :                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
    1937             :                                                     expr);
    1938             : 
    1939             :                 /*
    1940             :                  * transformExpr() should have already rejected subqueries,
    1941             :                  * aggregates, and window functions, based on the EXPR_KIND_
    1942             :                  * for an index expression.
    1943             :                  */
    1944             : 
    1945             :                 /*
    1946             :                  * An expression using mutable functions is probably wrong,
    1947             :                  * since if you aren't going to get the same result for the
    1948             :                  * same data every time, it's not clear what the index entries
    1949             :                  * mean at all.
    1950             :                  */
    1951         714 :                 if (CheckMutability((Expr *) expr))
    1952           0 :                     ereport(ERROR,
    1953             :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1954             :                              errmsg("functions in index expression must be marked IMMUTABLE")));
    1955             :             }
    1956             :         }
    1957             : 
    1958      142702 :         typeOidP[attn] = atttype;
    1959             : 
    1960             :         /*
    1961             :          * Included columns have no collation, no opclass and no ordering
    1962             :          * options.
    1963             :          */
    1964      142702 :         if (attn >= nkeycols)
    1965             :         {
    1966         638 :             if (attribute->collation)
    1967           0 :                 ereport(ERROR,
    1968             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1969             :                          errmsg("including column does not support a collation")));
    1970         638 :             if (attribute->opclass)
    1971           0 :                 ereport(ERROR,
    1972             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1973             :                          errmsg("including column does not support an operator class")));
    1974         638 :             if (attribute->ordering != SORTBY_DEFAULT)
    1975           0 :                 ereport(ERROR,
    1976             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1977             :                          errmsg("including column does not support ASC/DESC options")));
    1978         638 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    1979           0 :                 ereport(ERROR,
    1980             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1981             :                          errmsg("including column does not support NULLS FIRST/LAST options")));
    1982             : 
    1983         638 :             classOidP[attn] = InvalidOid;
    1984         638 :             colOptionP[attn] = 0;
    1985         638 :             collationOidP[attn] = InvalidOid;
    1986         638 :             attn++;
    1987             : 
    1988         638 :             continue;
    1989             :         }
    1990             : 
    1991             :         /*
    1992             :          * Apply collation override if any.  Use of ddl_userid is necessary
    1993             :          * due to ACL checks therein, and it's safe because collations don't
    1994             :          * contain opaque expressions (or non-opaque expressions).
    1995             :          */
    1996      142064 :         if (attribute->collation)
    1997             :         {
    1998         112 :             if (OidIsValid(ddl_userid))
    1999             :             {
    2000         112 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2001         112 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2002             :             }
    2003         112 :             attcollation = get_collation_oid(attribute->collation, false);
    2004         110 :             if (OidIsValid(ddl_userid))
    2005             :             {
    2006         110 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2007         110 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2008             :             }
    2009             :         }
    2010             : 
    2011             :         /*
    2012             :          * Check we have a collation iff it's a collatable type.  The only
    2013             :          * expected failures here are (1) COLLATE applied to a noncollatable
    2014             :          * type, or (2) index expression had an unresolved collation.  But we
    2015             :          * might as well code this to be a complete consistency check.
    2016             :          */
    2017      142062 :         if (type_is_collatable(atttype))
    2018             :         {
    2019       24484 :             if (!OidIsValid(attcollation))
    2020           0 :                 ereport(ERROR,
    2021             :                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
    2022             :                          errmsg("could not determine which collation to use for index expression"),
    2023             :                          errhint("Use the COLLATE clause to set the collation explicitly.")));
    2024             :         }
    2025             :         else
    2026             :         {
    2027      117578 :             if (OidIsValid(attcollation))
    2028          12 :                 ereport(ERROR,
    2029             :                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    2030             :                          errmsg("collations are not supported by type %s",
    2031             :                                 format_type_be(atttype))));
    2032             :         }
    2033             : 
    2034      142050 :         collationOidP[attn] = attcollation;
    2035             : 
    2036             :         /*
    2037             :          * Identify the opclass to use.  Use of ddl_userid is necessary due to
    2038             :          * ACL checks therein.  This is safe despite opclasses containing
    2039             :          * opaque expressions (specifically, functions), because only
    2040             :          * superusers can define opclasses.
    2041             :          */
    2042      142050 :         if (OidIsValid(ddl_userid))
    2043             :         {
    2044      141942 :             AtEOXact_GUC(false, *ddl_save_nestlevel);
    2045      141942 :             SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2046             :         }
    2047      142050 :         classOidP[attn] = ResolveOpClass(attribute->opclass,
    2048             :                                          atttype,
    2049             :                                          accessMethodName,
    2050             :                                          accessMethodId);
    2051      142044 :         if (OidIsValid(ddl_userid))
    2052             :         {
    2053      141936 :             SetUserIdAndSecContext(save_userid, save_sec_context);
    2054      141936 :             *ddl_save_nestlevel = NewGUCNestLevel();
    2055             :         }
    2056             : 
    2057             :         /*
    2058             :          * Identify the exclusion operator, if any.
    2059             :          */
    2060      142044 :         if (nextExclOp)
    2061             :         {
    2062         158 :             List       *opname = (List *) lfirst(nextExclOp);
    2063             :             Oid         opid;
    2064             :             Oid         opfamily;
    2065             :             int         strat;
    2066             : 
    2067             :             /*
    2068             :              * Find the operator --- it must accept the column datatype
    2069             :              * without runtime coercion (but binary compatibility is OK).
    2070             :              * Operators contain opaque expressions (specifically, functions).
    2071             :              * compatible_oper_opid() boils down to oper() and
    2072             :              * IsBinaryCoercible().  PostgreSQL would have security problems
    2073             :              * elsewhere if oper() started calling opaque expressions.
    2074             :              */
    2075         158 :             if (OidIsValid(ddl_userid))
    2076             :             {
    2077         158 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2078         158 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2079             :             }
    2080         158 :             opid = compatible_oper_opid(opname, atttype, atttype, false);
    2081         158 :             if (OidIsValid(ddl_userid))
    2082             :             {
    2083         158 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2084         158 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2085             :             }
    2086             : 
    2087             :             /*
    2088             :              * Only allow commutative operators to be used in exclusion
    2089             :              * constraints. If X conflicts with Y, but Y does not conflict
    2090             :              * with X, bad things will happen.
    2091             :              */
    2092         158 :             if (get_commutator(opid) != opid)
    2093           0 :                 ereport(ERROR,
    2094             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2095             :                          errmsg("operator %s is not commutative",
    2096             :                                 format_operator(opid)),
    2097             :                          errdetail("Only commutative operators can be used in exclusion constraints.")));
    2098             : 
    2099             :             /*
    2100             :              * Operator must be a member of the right opfamily, too
    2101             :              */
    2102         158 :             opfamily = get_opclass_family(classOidP[attn]);
    2103         158 :             strat = get_op_opfamily_strategy(opid, opfamily);
    2104         158 :             if (strat == 0)
    2105             :             {
    2106             :                 HeapTuple   opftuple;
    2107             :                 Form_pg_opfamily opfform;
    2108             : 
    2109             :                 /*
    2110             :                  * attribute->opclass might not explicitly name the opfamily,
    2111             :                  * so fetch the name of the selected opfamily for use in the
    2112             :                  * error message.
    2113             :                  */
    2114           0 :                 opftuple = SearchSysCache1(OPFAMILYOID,
    2115             :                                            ObjectIdGetDatum(opfamily));
    2116           0 :                 if (!HeapTupleIsValid(opftuple))
    2117           0 :                     elog(ERROR, "cache lookup failed for opfamily %u",
    2118             :                          opfamily);
    2119           0 :                 opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
    2120             : 
    2121           0 :                 ereport(ERROR,
    2122             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2123             :                          errmsg("operator %s is not a member of operator family \"%s\"",
    2124             :                                 format_operator(opid),
    2125             :                                 NameStr(opfform->opfname)),
    2126             :                          errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
    2127             :             }
    2128             : 
    2129         158 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2130         158 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2131         158 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2132         158 :             nextExclOp = lnext(exclusionOpNames, nextExclOp);
    2133             :         }
    2134             : 
    2135             :         /*
    2136             :          * Set up the per-column options (indoption field).  For now, this is
    2137             :          * zero for any un-ordered index, while ordered indexes have DESC and
    2138             :          * NULLS FIRST/LAST options.
    2139             :          */
    2140      142044 :         colOptionP[attn] = 0;
    2141      142044 :         if (amcanorder)
    2142             :         {
    2143             :             /* default ordering is ASC */
    2144      139622 :             if (attribute->ordering == SORTBY_DESC)
    2145          42 :                 colOptionP[attn] |= INDOPTION_DESC;
    2146             :             /* default null ordering is LAST for ASC, FIRST for DESC */
    2147      139622 :             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
    2148             :             {
    2149      139592 :                 if (attribute->ordering == SORTBY_DESC)
    2150          30 :                     colOptionP[attn] |= INDOPTION_NULLS_FIRST;
    2151             :             }
    2152          30 :             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
    2153          12 :                 colOptionP[attn] |= INDOPTION_NULLS_FIRST;
    2154             :         }
    2155             :         else
    2156             :         {
    2157             :             /* index AM does not support ordering */
    2158        2422 :             if (attribute->ordering != SORTBY_DEFAULT)
    2159           0 :                 ereport(ERROR,
    2160             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2161             :                          errmsg("access method \"%s\" does not support ASC/DESC options",
    2162             :                                 accessMethodName)));
    2163        2422 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2164           0 :                 ereport(ERROR,
    2165             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2166             :                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
    2167             :                                 accessMethodName)));
    2168             :         }
    2169             : 
    2170             :         /* Set up the per-column opclass options (attoptions field). */
    2171      142044 :         if (attribute->opclassopts)
    2172             :         {
    2173             :             Assert(attn < nkeycols);
    2174             : 
    2175         136 :             if (!indexInfo->ii_OpclassOptions)
    2176         130 :                 indexInfo->ii_OpclassOptions =
    2177         130 :                     palloc0_array(Datum, indexInfo->ii_NumIndexAttrs);
    2178             : 
    2179         136 :             indexInfo->ii_OpclassOptions[attn] =
    2180         136 :                 transformRelOptions((Datum) 0, attribute->opclassopts,
    2181             :                                     NULL, NULL, false, false);
    2182             :         }
    2183             : 
    2184      142044 :         attn++;
    2185             :     }
    2186       90184 : }
    2187             : 
    2188             : /*
    2189             :  * Resolve possibly-defaulted operator class specification
    2190             :  *
    2191             :  * Note: This is used to resolve operator class specifications in index and
    2192             :  * partition key definitions.
    2193             :  */
    2194             : Oid
    2195      142182 : ResolveOpClass(List *opclass, Oid attrType,
    2196             :                const char *accessMethodName, Oid accessMethodId)
    2197             : {
    2198             :     char       *schemaname;
    2199             :     char       *opcname;
    2200             :     HeapTuple   tuple;
    2201             :     Form_pg_opclass opform;
    2202             :     Oid         opClassId,
    2203             :                 opInputType;
    2204             : 
    2205      142182 :     if (opclass == NIL)
    2206             :     {
    2207             :         /* no operator class specified, so find the default */
    2208       16778 :         opClassId = GetDefaultOpClass(attrType, accessMethodId);
    2209       16778 :         if (!OidIsValid(opClassId))
    2210           6 :             ereport(ERROR,
    2211             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2212             :                      errmsg("data type %s has no default operator class for access method \"%s\"",
    2213             :                             format_type_be(attrType), accessMethodName),
    2214             :                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
    2215       16772 :         return opClassId;
    2216             :     }
    2217             : 
    2218             :     /*
    2219             :      * Specific opclass name given, so look up the opclass.
    2220             :      */
    2221             : 
    2222             :     /* deconstruct the name list */
    2223      125404 :     DeconstructQualifiedName(opclass, &schemaname, &opcname);
    2224             : 
    2225      125404 :     if (schemaname)
    2226             :     {
    2227             :         /* Look in specific schema only */
    2228             :         Oid         namespaceId;
    2229             : 
    2230          10 :         namespaceId = LookupExplicitNamespace(schemaname, false);
    2231          10 :         tuple = SearchSysCache3(CLAAMNAMENSP,
    2232             :                                 ObjectIdGetDatum(accessMethodId),
    2233             :                                 PointerGetDatum(opcname),
    2234             :                                 ObjectIdGetDatum(namespaceId));
    2235             :     }
    2236             :     else
    2237             :     {
    2238             :         /* Unqualified opclass name, so search the search path */
    2239      125394 :         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
    2240      125394 :         if (!OidIsValid(opClassId))
    2241          12 :             ereport(ERROR,
    2242             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2243             :                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2244             :                             opcname, accessMethodName)));
    2245      125382 :         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
    2246             :     }
    2247             : 
    2248      125392 :     if (!HeapTupleIsValid(tuple))
    2249           0 :         ereport(ERROR,
    2250             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2251             :                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2252             :                         NameListToString(opclass), accessMethodName)));
    2253             : 
    2254             :     /*
    2255             :      * Verify that the index operator class accepts this datatype.  Note we
    2256             :      * will accept binary compatibility.
    2257             :      */
    2258      125392 :     opform = (Form_pg_opclass) GETSTRUCT(tuple);
    2259      125392 :     opClassId = opform->oid;
    2260      125392 :     opInputType = opform->opcintype;
    2261             : 
    2262      125392 :     if (!IsBinaryCoercible(attrType, opInputType))
    2263           0 :         ereport(ERROR,
    2264             :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
    2265             :                  errmsg("operator class \"%s\" does not accept data type %s",
    2266             :                         NameListToString(opclass), format_type_be(attrType))));
    2267             : 
    2268      125392 :     ReleaseSysCache(tuple);
    2269             : 
    2270      125392 :     return opClassId;
    2271             : }
    2272             : 
    2273             : /*
    2274             :  * GetDefaultOpClass
    2275             :  *
    2276             :  * Given the OIDs of a datatype and an access method, find the default
    2277             :  * operator class, if any.  Returns InvalidOid if there is none.
    2278             :  */
    2279             : Oid
    2280      203700 : GetDefaultOpClass(Oid type_id, Oid am_id)
    2281             : {
    2282      203700 :     Oid         result = InvalidOid;
    2283      203700 :     int         nexact = 0;
    2284      203700 :     int         ncompatible = 0;
    2285      203700 :     int         ncompatiblepreferred = 0;
    2286             :     Relation    rel;
    2287             :     ScanKeyData skey[1];
    2288             :     SysScanDesc scan;
    2289             :     HeapTuple   tup;
    2290             :     TYPCATEGORY tcategory;
    2291             : 
    2292             :     /* If it's a domain, look at the base type instead */
    2293      203700 :     type_id = getBaseType(type_id);
    2294             : 
    2295      203700 :     tcategory = TypeCategory(type_id);
    2296             : 
    2297             :     /*
    2298             :      * We scan through all the opclasses available for the access method,
    2299             :      * looking for one that is marked default and matches the target type
    2300             :      * (either exactly or binary-compatibly, but prefer an exact match).
    2301             :      *
    2302             :      * We could find more than one binary-compatible match.  If just one is
    2303             :      * for a preferred type, use that one; otherwise we fail, forcing the user
    2304             :      * to specify which one he wants.  (The preferred-type special case is a
    2305             :      * kluge for varchar: it's binary-compatible to both text and bpchar, so
    2306             :      * we need a tiebreaker.)  If we find more than one exact match, then
    2307             :      * someone put bogus entries in pg_opclass.
    2308             :      */
    2309      203700 :     rel = table_open(OperatorClassRelationId, AccessShareLock);
    2310             : 
    2311      203700 :     ScanKeyInit(&skey[0],
    2312             :                 Anum_pg_opclass_opcmethod,
    2313             :                 BTEqualStrategyNumber, F_OIDEQ,
    2314             :                 ObjectIdGetDatum(am_id));
    2315             : 
    2316      203700 :     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
    2317             :                               NULL, 1, skey);
    2318             : 
    2319     9106498 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    2320             :     {
    2321     8902798 :         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
    2322             : 
    2323             :         /* ignore altogether if not a default opclass */
    2324     8902798 :         if (!opclass->opcdefault)
    2325     1263116 :             continue;
    2326     7639682 :         if (opclass->opcintype == type_id)
    2327             :         {
    2328      185254 :             nexact++;
    2329      185254 :             result = opclass->oid;
    2330             :         }
    2331    11503814 :         else if (nexact == 0 &&
    2332     4049386 :                  IsBinaryCoercible(type_id, opclass->opcintype))
    2333             :         {
    2334       32286 :             if (IsPreferredType(tcategory, opclass->opcintype))
    2335             :             {
    2336        3962 :                 ncompatiblepreferred++;
    2337        3962 :                 result = opclass->oid;
    2338             :             }
    2339       28324 :             else if (ncompatiblepreferred == 0)
    2340             :             {
    2341       28324 :                 ncompatible++;
    2342       28324 :                 result = opclass->oid;
    2343             :             }
    2344             :         }
    2345             :     }
    2346             : 
    2347      203700 :     systable_endscan(scan);
    2348             : 
    2349      203700 :     table_close(rel, AccessShareLock);
    2350             : 
    2351             :     /* raise error if pg_opclass contains inconsistent data */
    2352      203700 :     if (nexact > 1)
    2353           0 :         ereport(ERROR,
    2354             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2355             :                  errmsg("there are multiple default operator classes for data type %s",
    2356             :                         format_type_be(type_id))));
    2357             : 
    2358      203700 :     if (nexact == 1 ||
    2359       14486 :         ncompatiblepreferred == 1 ||
    2360       14486 :         (ncompatiblepreferred == 0 && ncompatible == 1))
    2361      201712 :         return result;
    2362             : 
    2363        1988 :     return InvalidOid;
    2364             : }
    2365             : 
    2366             : /*
    2367             :  *  makeObjectName()
    2368             :  *
    2369             :  *  Create a name for an implicitly created index, sequence, constraint,
    2370             :  *  extended statistics, etc.
    2371             :  *
    2372             :  *  The parameters are typically: the original table name, the original field
    2373             :  *  name, and a "type" string (such as "seq" or "pkey").    The field name
    2374             :  *  and/or type can be NULL if not relevant.
    2375             :  *
    2376             :  *  The result is a palloc'd string.
    2377             :  *
    2378             :  *  The basic result we want is "name1_name2_label", omitting "_name2" or
    2379             :  *  "_label" when those parameters are NULL.  However, we must generate
    2380             :  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
    2381             :  *  both names if necessary to make a short-enough string.  The label part
    2382             :  *  is never truncated (so it had better be reasonably short).
    2383             :  *
    2384             :  *  The caller is responsible for checking uniqueness of the generated
    2385             :  *  name and retrying as needed; retrying will be done by altering the
    2386             :  *  "label" string (which is why we never truncate that part).
    2387             :  */
    2388             : char *
    2389      182134 : makeObjectName(const char *name1, const char *name2, const char *label)
    2390             : {
    2391             :     char       *name;
    2392      182134 :     int         overhead = 0;   /* chars needed for label and underscores */
    2393             :     int         availchars;     /* chars available for name(s) */
    2394             :     int         name1chars;     /* chars allocated to name1 */
    2395             :     int         name2chars;     /* chars allocated to name2 */
    2396             :     int         ndx;
    2397             : 
    2398      182134 :     name1chars = strlen(name1);
    2399      182134 :     if (name2)
    2400             :     {
    2401      173962 :         name2chars = strlen(name2);
    2402      173962 :         overhead++;             /* allow for separating underscore */
    2403             :     }
    2404             :     else
    2405        8172 :         name2chars = 0;
    2406      182134 :     if (label)
    2407       16230 :         overhead += strlen(label) + 1;
    2408             : 
    2409      182134 :     availchars = NAMEDATALEN - 1 - overhead;
    2410             :     Assert(availchars > 0);      /* else caller chose a bad label */
    2411             : 
    2412             :     /*
    2413             :      * If we must truncate, preferentially truncate the longer name. This
    2414             :      * logic could be expressed without a loop, but it's simple and obvious as
    2415             :      * a loop.
    2416             :      */
    2417      182200 :     while (name1chars + name2chars > availchars)
    2418             :     {
    2419          66 :         if (name1chars > name2chars)
    2420           0 :             name1chars--;
    2421             :         else
    2422          66 :             name2chars--;
    2423             :     }
    2424             : 
    2425      182134 :     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
    2426      182134 :     if (name2)
    2427      173962 :         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
    2428             : 
    2429             :     /* Now construct the string using the chosen lengths */
    2430      182134 :     name = palloc(name1chars + name2chars + overhead + 1);
    2431      182134 :     memcpy(name, name1, name1chars);
    2432      182134 :     ndx = name1chars;
    2433      182134 :     if (name2)
    2434             :     {
    2435      173962 :         name[ndx++] = '_';
    2436      173962 :         memcpy(name + ndx, name2, name2chars);
    2437      173962 :         ndx += name2chars;
    2438             :     }
    2439      182134 :     if (label)
    2440             :     {
    2441       16230 :         name[ndx++] = '_';
    2442       16230 :         strcpy(name + ndx, label);
    2443             :     }
    2444             :     else
    2445      165904 :         name[ndx] = '\0';
    2446             : 
    2447      182134 :     return name;
    2448             : }
    2449             : 
    2450             : /*
    2451             :  * Select a nonconflicting name for a new relation.  This is ordinarily
    2452             :  * used to choose index names (which is why it's here) but it can also
    2453             :  * be used for sequences, or any autogenerated relation kind.
    2454             :  *
    2455             :  * name1, name2, and label are used the same way as for makeObjectName(),
    2456             :  * except that the label can't be NULL; digits will be appended to the label
    2457             :  * if needed to create a name that is unique within the specified namespace.
    2458             :  *
    2459             :  * If isconstraint is true, we also avoid choosing a name matching any
    2460             :  * existing constraint in the same namespace.  (This is stricter than what
    2461             :  * Postgres itself requires, but the SQL standard says that constraint names
    2462             :  * should be unique within schemas, so we follow that for autogenerated
    2463             :  * constraint names.)
    2464             :  *
    2465             :  * Note: it is theoretically possible to get a collision anyway, if someone
    2466             :  * else chooses the same name concurrently.  This is fairly unlikely to be
    2467             :  * a problem in practice, especially if one is holding an exclusive lock on
    2468             :  * the relation identified by name1.  However, if choosing multiple names
    2469             :  * within a single command, you'd better create the new object and do
    2470             :  * CommandCounterIncrement before choosing the next one!
    2471             :  *
    2472             :  * Returns a palloc'd string.
    2473             :  */
    2474             : char *
    2475       11592 : ChooseRelationName(const char *name1, const char *name2,
    2476             :                    const char *label, Oid namespaceid,
    2477             :                    bool isconstraint)
    2478             : {
    2479       11592 :     int         pass = 0;
    2480       11592 :     char       *relname = NULL;
    2481             :     char        modlabel[NAMEDATALEN];
    2482             : 
    2483             :     /* try the unmodified label first */
    2484       11592 :     strlcpy(modlabel, label, sizeof(modlabel));
    2485             : 
    2486             :     for (;;)
    2487             :     {
    2488       11788 :         relname = makeObjectName(name1, name2, modlabel);
    2489             : 
    2490       11788 :         if (!OidIsValid(get_relname_relid(relname, namespaceid)))
    2491             :         {
    2492       11598 :             if (!isconstraint ||
    2493        7696 :                 !ConstraintNameExists(relname, namespaceid))
    2494             :                 break;
    2495             :         }
    2496             : 
    2497             :         /* found a conflict, so try a new name component */
    2498         196 :         pfree(relname);
    2499         196 :         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
    2500             :     }
    2501             : 
    2502       11592 :     return relname;
    2503             : }
    2504             : 
    2505             : /*
    2506             :  * Select the name to be used for an index.
    2507             :  *
    2508             :  * The argument list is pretty ad-hoc :-(
    2509             :  */
    2510             : static char *
    2511        9722 : ChooseIndexName(const char *tabname, Oid namespaceId,
    2512             :                 List *colnames, List *exclusionOpNames,
    2513             :                 bool primary, bool isconstraint)
    2514             : {
    2515             :     char       *indexname;
    2516             : 
    2517        9722 :     if (primary)
    2518             :     {
    2519             :         /* the primary key's name does not depend on the specific column(s) */
    2520        6960 :         indexname = ChooseRelationName(tabname,
    2521             :                                        NULL,
    2522             :                                        "pkey",
    2523             :                                        namespaceId,
    2524             :                                        true);
    2525             :     }
    2526        2762 :     else if (exclusionOpNames != NIL)
    2527             :     {
    2528          90 :         indexname = ChooseRelationName(tabname,
    2529          90 :                                        ChooseIndexNameAddition(colnames),
    2530             :                                        "excl",
    2531             :                                        namespaceId,
    2532             :                                        true);
    2533             :     }
    2534        2672 :     else if (isconstraint)
    2535             :     {
    2536         640 :         indexname = ChooseRelationName(tabname,
    2537         640 :                                        ChooseIndexNameAddition(colnames),
    2538             :                                        "key",
    2539             :                                        namespaceId,
    2540             :                                        true);
    2541             :     }
    2542             :     else
    2543             :     {
    2544        2032 :         indexname = ChooseRelationName(tabname,
    2545        2032 :                                        ChooseIndexNameAddition(colnames),
    2546             :                                        "idx",
    2547             :                                        namespaceId,
    2548             :                                        false);
    2549             :     }
    2550             : 
    2551        9722 :     return indexname;
    2552             : }
    2553             : 
    2554             : /*
    2555             :  * Generate "name2" for a new index given the list of column names for it
    2556             :  * (as produced by ChooseIndexColumnNames).  This will be passed to
    2557             :  * ChooseRelationName along with the parent table name and a suitable label.
    2558             :  *
    2559             :  * We know that less than NAMEDATALEN characters will actually be used,
    2560             :  * so we can truncate the result once we've generated that many.
    2561             :  *
    2562             :  * XXX See also ChooseForeignKeyConstraintNameAddition and
    2563             :  * ChooseExtendedStatisticNameAddition.
    2564             :  */
    2565             : static char *
    2566        2762 : ChooseIndexNameAddition(List *colnames)
    2567             : {
    2568             :     char        buf[NAMEDATALEN * 2];
    2569        2762 :     int         buflen = 0;
    2570             :     ListCell   *lc;
    2571             : 
    2572        2762 :     buf[0] = '\0';
    2573        6438 :     foreach(lc, colnames)
    2574             :     {
    2575        3676 :         const char *name = (const char *) lfirst(lc);
    2576             : 
    2577        3676 :         if (buflen > 0)
    2578         914 :             buf[buflen++] = '_';    /* insert _ between names */
    2579             : 
    2580             :         /*
    2581             :          * At this point we have buflen <= NAMEDATALEN.  name should be less
    2582             :          * than NAMEDATALEN already, but use strlcpy for paranoia.
    2583             :          */
    2584        3676 :         strlcpy(buf + buflen, name, NAMEDATALEN);
    2585        3676 :         buflen += strlen(buf + buflen);
    2586        3676 :         if (buflen >= NAMEDATALEN)
    2587           0 :             break;
    2588             :     }
    2589        2762 :     return pstrdup(buf);
    2590             : }
    2591             : 
    2592             : /*
    2593             :  * Select the actual names to be used for the columns of an index, given the
    2594             :  * list of IndexElems for the columns.  This is mostly about ensuring the
    2595             :  * names are unique so we don't get a conflicting-attribute-names error.
    2596             :  *
    2597             :  * Returns a List of plain strings (char *, not String nodes).
    2598             :  */
    2599             : static List *
    2600       90220 : ChooseIndexColumnNames(List *indexElems)
    2601             : {
    2602       90220 :     List       *result = NIL;
    2603             :     ListCell   *lc;
    2604             : 
    2605      232990 :     foreach(lc, indexElems)
    2606             :     {
    2607      142770 :         IndexElem  *ielem = (IndexElem *) lfirst(lc);
    2608             :         const char *origname;
    2609             :         const char *curname;
    2610             :         int         i;
    2611             :         char        buf[NAMEDATALEN];
    2612             : 
    2613             :         /* Get the preliminary name from the IndexElem */
    2614      142770 :         if (ielem->indexcolname)
    2615        2086 :             origname = ielem->indexcolname; /* caller-specified name */
    2616      140684 :         else if (ielem->name)
    2617      140328 :             origname = ielem->name; /* simple column reference */
    2618             :         else
    2619         356 :             origname = "expr";    /* default name for expression */
    2620             : 
    2621             :         /* If it conflicts with any previous column, tweak it */
    2622      142770 :         curname = origname;
    2623      142770 :         for (i = 1;; i++)
    2624          56 :         {
    2625             :             ListCell   *lc2;
    2626             :             char        nbuf[32];
    2627             :             int         nlen;
    2628             : 
    2629      224660 :             foreach(lc2, result)
    2630             :             {
    2631       81890 :                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
    2632          56 :                     break;
    2633             :             }
    2634      142826 :             if (lc2 == NULL)
    2635      142770 :                 break;          /* found nonconflicting name */
    2636             : 
    2637          56 :             sprintf(nbuf, "%d", i);
    2638             : 
    2639             :             /* Ensure generated names are shorter than NAMEDATALEN */
    2640          56 :             nlen = pg_mbcliplen(origname, strlen(origname),
    2641          56 :                                 NAMEDATALEN - 1 - strlen(nbuf));
    2642          56 :             memcpy(buf, origname, nlen);
    2643          56 :             strcpy(buf + nlen, nbuf);
    2644          56 :             curname = buf;
    2645             :         }
    2646             : 
    2647             :         /* And attach to the result list */
    2648      142770 :         result = lappend(result, pstrdup(curname));
    2649             :     }
    2650       90220 :     return result;
    2651             : }
    2652             : 
    2653             : /*
    2654             :  * ExecReindex
    2655             :  *
    2656             :  * Primary entry point for manual REINDEX commands.  This is mainly a
    2657             :  * preparation wrapper for the real operations that will happen in
    2658             :  * each subroutine of REINDEX.
    2659             :  */
    2660             : void
    2661         878 : ExecReindex(ParseState *pstate, ReindexStmt *stmt, bool isTopLevel)
    2662             : {
    2663         878 :     ReindexParams params = {0};
    2664             :     ListCell   *lc;
    2665         878 :     bool        concurrently = false;
    2666         878 :     bool        verbose = false;
    2667         878 :     char       *tablespacename = NULL;
    2668             : 
    2669             :     /* Parse option list */
    2670        1528 :     foreach(lc, stmt->params)
    2671             :     {
    2672         650 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2673             : 
    2674         650 :         if (strcmp(opt->defname, "verbose") == 0)
    2675          14 :             verbose = defGetBoolean(opt);
    2676         636 :         else if (strcmp(opt->defname, "concurrently") == 0)
    2677         508 :             concurrently = defGetBoolean(opt);
    2678         128 :         else if (strcmp(opt->defname, "tablespace") == 0)
    2679         128 :             tablespacename = defGetString(opt);
    2680             :         else
    2681           0 :             ereport(ERROR,
    2682             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2683             :                      errmsg("unrecognized REINDEX option \"%s\"",
    2684             :                             opt->defname),
    2685             :                      parser_errposition(pstate, opt->location)));
    2686             :     }
    2687             : 
    2688         878 :     if (concurrently)
    2689         508 :         PreventInTransactionBlock(isTopLevel,
    2690             :                                   "REINDEX CONCURRENTLY");
    2691             : 
    2692         860 :     params.options =
    2693        1720 :         (verbose ? REINDEXOPT_VERBOSE : 0) |
    2694         860 :         (concurrently ? REINDEXOPT_CONCURRENTLY : 0);
    2695             : 
    2696             :     /*
    2697             :      * Assign the tablespace OID to move indexes to, with InvalidOid to do
    2698             :      * nothing.
    2699             :      */
    2700         860 :     if (tablespacename != NULL)
    2701             :     {
    2702         128 :         params.tablespaceOid = get_tablespace_oid(tablespacename, false);
    2703             : 
    2704             :         /* Check permissions except when moving to database's default */
    2705         128 :         if (OidIsValid(params.tablespaceOid) &&
    2706         128 :             params.tablespaceOid != MyDatabaseTableSpace)
    2707             :         {
    2708             :             AclResult   aclresult;
    2709             : 
    2710         128 :             aclresult = object_aclcheck(TableSpaceRelationId, params.tablespaceOid,
    2711             :                                         GetUserId(), ACL_CREATE);
    2712         128 :             if (aclresult != ACLCHECK_OK)
    2713          12 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2714          12 :                                get_tablespace_name(params.tablespaceOid));
    2715             :         }
    2716             :     }
    2717             :     else
    2718         732 :         params.tablespaceOid = InvalidOid;
    2719             : 
    2720         848 :     switch (stmt->kind)
    2721             :     {
    2722         312 :         case REINDEX_OBJECT_INDEX:
    2723         312 :             ReindexIndex(stmt->relation, &params, isTopLevel);
    2724         206 :             break;
    2725         408 :         case REINDEX_OBJECT_TABLE:
    2726         408 :             ReindexTable(stmt->relation, &params, isTopLevel);
    2727         286 :             break;
    2728         128 :         case REINDEX_OBJECT_SCHEMA:
    2729             :         case REINDEX_OBJECT_SYSTEM:
    2730             :         case REINDEX_OBJECT_DATABASE:
    2731             : 
    2732             :             /*
    2733             :              * This cannot run inside a user transaction block; if we were
    2734             :              * inside a transaction, then its commit- and
    2735             :              * start-transaction-command calls would not have the intended
    2736             :              * effect!
    2737             :              */
    2738         128 :             PreventInTransactionBlock(isTopLevel,
    2739         182 :                                       (stmt->kind == REINDEX_OBJECT_SCHEMA) ? "REINDEX SCHEMA" :
    2740          54 :                                       (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
    2741             :                                       "REINDEX DATABASE");
    2742         122 :             ReindexMultipleTables(stmt->name, stmt->kind, &params);
    2743          72 :             break;
    2744           0 :         default:
    2745           0 :             elog(ERROR, "unrecognized object type: %d",
    2746             :                  (int) stmt->kind);
    2747             :             break;
    2748             :     }
    2749         564 : }
    2750             : 
    2751             : /*
    2752             :  * ReindexIndex
    2753             :  *      Recreate a specific index.
    2754             :  */
    2755             : static void
    2756         312 : ReindexIndex(RangeVar *indexRelation, ReindexParams *params, bool isTopLevel)
    2757             : {
    2758             :     struct ReindexIndexCallbackState state;
    2759             :     Oid         indOid;
    2760             :     char        persistence;
    2761             :     char        relkind;
    2762             : 
    2763             :     /*
    2764             :      * Find and lock index, and check permissions on table; use callback to
    2765             :      * obtain lock on table first, to avoid deadlock hazard.  The lock level
    2766             :      * used here must match the index lock obtained in reindex_index().
    2767             :      *
    2768             :      * If it's a temporary index, we will perform a non-concurrent reindex,
    2769             :      * even if CONCURRENTLY was requested.  In that case, reindex_index() will
    2770             :      * upgrade the lock, but that's OK, because other sessions can't hold
    2771             :      * locks on our temporary table.
    2772             :      */
    2773         312 :     state.params = *params;
    2774         312 :     state.locked_table_oid = InvalidOid;
    2775         312 :     indOid = RangeVarGetRelidExtended(indexRelation,
    2776         312 :                                       (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2777             :                                       ShareUpdateExclusiveLock : AccessExclusiveLock,
    2778             :                                       0,
    2779             :                                       RangeVarCallbackForReindexIndex,
    2780             :                                       &state);
    2781             : 
    2782             :     /*
    2783             :      * Obtain the current persistence and kind of the existing index.  We
    2784             :      * already hold a lock on the index.
    2785             :      */
    2786         264 :     persistence = get_rel_persistence(indOid);
    2787         264 :     relkind = get_rel_relkind(indOid);
    2788             : 
    2789         264 :     if (relkind == RELKIND_PARTITIONED_INDEX)
    2790          24 :         ReindexPartitions(indOid, params, isTopLevel);
    2791         240 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    2792             :              persistence != RELPERSISTENCE_TEMP)
    2793         142 :         ReindexRelationConcurrently(indOid, params);
    2794             :     else
    2795             :     {
    2796          98 :         ReindexParams newparams = *params;
    2797             : 
    2798          98 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    2799          98 :         reindex_index(indOid, false, persistence, &newparams);
    2800             :     }
    2801         206 : }
    2802             : 
    2803             : /*
    2804             :  * Check permissions on table before acquiring relation lock; also lock
    2805             :  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
    2806             :  * deadlocks.
    2807             :  */
    2808             : static void
    2809         318 : RangeVarCallbackForReindexIndex(const RangeVar *relation,
    2810             :                                 Oid relId, Oid oldRelId, void *arg)
    2811             : {
    2812             :     char        relkind;
    2813         318 :     struct ReindexIndexCallbackState *state = arg;
    2814             :     LOCKMODE    table_lockmode;
    2815             :     Oid         table_oid;
    2816             : 
    2817             :     /*
    2818             :      * Lock level here should match table lock in reindex_index() for
    2819             :      * non-concurrent case and table locks used by index_concurrently_*() for
    2820             :      * concurrent case.
    2821             :      */
    2822         636 :     table_lockmode = (state->params.options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2823         318 :         ShareUpdateExclusiveLock : ShareLock;
    2824             : 
    2825             :     /*
    2826             :      * If we previously locked some other index's heap, and the name we're
    2827             :      * looking up no longer refers to that relation, release the now-useless
    2828             :      * lock.
    2829             :      */
    2830         318 :     if (relId != oldRelId && OidIsValid(oldRelId))
    2831             :     {
    2832           6 :         UnlockRelationOid(state->locked_table_oid, table_lockmode);
    2833           6 :         state->locked_table_oid = InvalidOid;
    2834             :     }
    2835             : 
    2836             :     /* If the relation does not exist, there's nothing more to do. */
    2837         318 :     if (!OidIsValid(relId))
    2838          12 :         return;
    2839             : 
    2840             :     /*
    2841             :      * If the relation does exist, check whether it's an index.  But note that
    2842             :      * the relation might have been dropped between the time we did the name
    2843             :      * lookup and now.  In that case, there's nothing to do.
    2844             :      */
    2845         306 :     relkind = get_rel_relkind(relId);
    2846         306 :     if (!relkind)
    2847           0 :         return;
    2848         306 :     if (relkind != RELKIND_INDEX &&
    2849             :         relkind != RELKIND_PARTITIONED_INDEX)
    2850          24 :         ereport(ERROR,
    2851             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2852             :                  errmsg("\"%s\" is not an index", relation->relname)));
    2853             : 
    2854             :     /* Check permissions */
    2855         282 :     table_oid = IndexGetRelation(relId, true);
    2856         564 :     if (OidIsValid(table_oid) &&
    2857         282 :         pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
    2858          12 :         !has_partition_ancestor_privs(table_oid, GetUserId(), ACL_MAINTAIN))
    2859          12 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    2860          12 :                        relation->relname);
    2861             : 
    2862             :     /* Lock heap before index to avoid deadlock. */
    2863         270 :     if (relId != oldRelId)
    2864             :     {
    2865             :         /*
    2866             :          * If the OID isn't valid, it means the index was concurrently
    2867             :          * dropped, which is not a problem for us; just return normally.
    2868             :          */
    2869         270 :         if (OidIsValid(table_oid))
    2870             :         {
    2871         270 :             LockRelationOid(table_oid, table_lockmode);
    2872         270 :             state->locked_table_oid = table_oid;
    2873             :         }
    2874             :     }
    2875             : }
    2876             : 
    2877             : /*
    2878             :  * ReindexTable
    2879             :  *      Recreate all indexes of a table (and of its toast table, if any)
    2880             :  */
    2881             : static Oid
    2882         408 : ReindexTable(RangeVar *relation, ReindexParams *params, bool isTopLevel)
    2883             : {
    2884             :     Oid         heapOid;
    2885             :     bool        result;
    2886             : 
    2887             :     /*
    2888             :      * The lock level used here should match reindex_relation().
    2889             :      *
    2890             :      * If it's a temporary table, we will perform a non-concurrent reindex,
    2891             :      * even if CONCURRENTLY was requested.  In that case, reindex_relation()
    2892             :      * will upgrade the lock, but that's OK, because other sessions can't hold
    2893             :      * locks on our temporary table.
    2894             :      */
    2895         408 :     heapOid = RangeVarGetRelidExtended(relation,
    2896         408 :                                        (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2897             :                                        ShareUpdateExclusiveLock : ShareLock,
    2898             :                                        0,
    2899             :                                        RangeVarCallbackMaintainsTable, NULL);
    2900             : 
    2901         362 :     if (get_rel_relkind(heapOid) == RELKIND_PARTITIONED_TABLE)
    2902          34 :         ReindexPartitions(heapOid, params, isTopLevel);
    2903         550 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    2904         222 :              get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
    2905             :     {
    2906         210 :         result = ReindexRelationConcurrently(heapOid, params);
    2907             : 
    2908         172 :         if (!result)
    2909          12 :             ereport(NOTICE,
    2910             :                     (errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
    2911             :                             relation->relname)));
    2912             :     }
    2913             :     else
    2914             :     {
    2915         118 :         ReindexParams newparams = *params;
    2916             : 
    2917         118 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    2918         118 :         result = reindex_relation(heapOid,
    2919             :                                   REINDEX_REL_PROCESS_TOAST |
    2920             :                                   REINDEX_REL_CHECK_CONSTRAINTS,
    2921             :                                   &newparams);
    2922          86 :         if (!result)
    2923           6 :             ereport(NOTICE,
    2924             :                     (errmsg("table \"%s\" has no indexes to reindex",
    2925             :                             relation->relname)));
    2926             :     }
    2927             : 
    2928         286 :     return heapOid;
    2929             : }
    2930             : 
    2931             : /*
    2932             :  * ReindexMultipleTables
    2933             :  *      Recreate indexes of tables selected by objectName/objectKind.
    2934             :  *
    2935             :  * To reduce the probability of deadlocks, each table is reindexed in a
    2936             :  * separate transaction, so we can release the lock on it right away.
    2937             :  * That means this must not be called within a user transaction block!
    2938             :  */
    2939             : static void
    2940         122 : ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
    2941             :                       ReindexParams *params)
    2942             : {
    2943             :     Oid         objectOid;
    2944             :     Relation    relationRelation;
    2945             :     TableScanDesc scan;
    2946             :     ScanKeyData scan_keys[1];
    2947             :     HeapTuple   tuple;
    2948             :     MemoryContext private_context;
    2949             :     MemoryContext old;
    2950         122 :     List       *relids = NIL;
    2951             :     int         num_keys;
    2952         122 :     bool        concurrent_warning = false;
    2953         122 :     bool        tablespace_warning = false;
    2954             : 
    2955             :     Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
    2956             :            objectKind == REINDEX_OBJECT_SYSTEM ||
    2957             :            objectKind == REINDEX_OBJECT_DATABASE);
    2958             : 
    2959             :     /*
    2960             :      * This matches the options enforced by the grammar, where the object name
    2961             :      * is optional for DATABASE and SYSTEM.
    2962             :      */
    2963             :     Assert(objectName || objectKind != REINDEX_OBJECT_SCHEMA);
    2964             : 
    2965         122 :     if (objectKind == REINDEX_OBJECT_SYSTEM &&
    2966          24 :         (params->options & REINDEXOPT_CONCURRENTLY) != 0)
    2967          20 :         ereport(ERROR,
    2968             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2969             :                  errmsg("cannot reindex system catalogs concurrently")));
    2970             : 
    2971             :     /*
    2972             :      * Get OID of object to reindex, being the database currently being used
    2973             :      * by session for a database or for system catalogs, or the schema defined
    2974             :      * by caller. At the same time do permission checks that need different
    2975             :      * processing depending on the object type.
    2976             :      */
    2977         102 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    2978             :     {
    2979          68 :         objectOid = get_namespace_oid(objectName, false);
    2980             : 
    2981          62 :         if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId()) &&
    2982          24 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    2983          18 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
    2984             :                            objectName);
    2985             :     }
    2986             :     else
    2987             :     {
    2988          34 :         objectOid = MyDatabaseId;
    2989             : 
    2990          34 :         if (objectName && strcmp(objectName, get_database_name(objectOid)) != 0)
    2991           6 :             ereport(ERROR,
    2992             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2993             :                      errmsg("can only reindex the currently open database")));
    2994          28 :         if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId()) &&
    2995           0 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    2996           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2997           0 :                            get_database_name(objectOid));
    2998             :     }
    2999             : 
    3000             :     /*
    3001             :      * Create a memory context that will survive forced transaction commits we
    3002             :      * do below.  Since it is a child of PortalContext, it will go away
    3003             :      * eventually even if we suffer an error; there's no need for special
    3004             :      * abort cleanup logic.
    3005             :      */
    3006          72 :     private_context = AllocSetContextCreate(PortalContext,
    3007             :                                             "ReindexMultipleTables",
    3008             :                                             ALLOCSET_SMALL_SIZES);
    3009             : 
    3010             :     /*
    3011             :      * Define the search keys to find the objects to reindex. For a schema, we
    3012             :      * select target relations using relnamespace, something not necessary for
    3013             :      * a database-wide operation.
    3014             :      */
    3015          72 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3016             :     {
    3017          44 :         num_keys = 1;
    3018          44 :         ScanKeyInit(&scan_keys[0],
    3019             :                     Anum_pg_class_relnamespace,
    3020             :                     BTEqualStrategyNumber, F_OIDEQ,
    3021             :                     ObjectIdGetDatum(objectOid));
    3022             :     }
    3023             :     else
    3024          28 :         num_keys = 0;
    3025             : 
    3026             :     /*
    3027             :      * Scan pg_class to build a list of the relations we need to reindex.
    3028             :      *
    3029             :      * We only consider plain relations and materialized views here (toast
    3030             :      * rels will be processed indirectly by reindex_relation).
    3031             :      */
    3032          72 :     relationRelation = table_open(RelationRelationId, AccessShareLock);
    3033          72 :     scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
    3034       14094 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3035             :     {
    3036       14022 :         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
    3037       14022 :         Oid         relid = classtuple->oid;
    3038             : 
    3039             :         /*
    3040             :          * Only regular tables and matviews can have indexes, so ignore any
    3041             :          * other kind of relation.
    3042             :          *
    3043             :          * Partitioned tables/indexes are skipped but matching leaf partitions
    3044             :          * are processed.
    3045             :          */
    3046       14022 :         if (classtuple->relkind != RELKIND_RELATION &&
    3047       11480 :             classtuple->relkind != RELKIND_MATVIEW)
    3048       11462 :             continue;
    3049             : 
    3050             :         /* Skip temp tables of other backends; we can't reindex them at all */
    3051        2560 :         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
    3052          36 :             !isTempNamespace(classtuple->relnamespace))
    3053           0 :             continue;
    3054             : 
    3055             :         /*
    3056             :          * Check user/system classification.  SYSTEM processes all the
    3057             :          * catalogs, and DATABASE processes everything that's not a catalog.
    3058             :          */
    3059        2560 :         if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3060         276 :             !IsCatalogRelationOid(relid))
    3061          20 :             continue;
    3062        4180 :         else if (objectKind == REINDEX_OBJECT_DATABASE &&
    3063        1640 :                  IsCatalogRelationOid(relid))
    3064        1536 :             continue;
    3065             : 
    3066             :         /*
    3067             :          * The table can be reindexed if the user has been granted MAINTAIN on
    3068             :          * the table or one of its partition ancestors or the user is a
    3069             :          * superuser, the table owner, or the database/schema owner (but in
    3070             :          * the latter case, only if it's not a shared relation).
    3071             :          * pg_class_aclcheck includes the superuser case, and depending on
    3072             :          * objectKind we already know that the user has permission to run
    3073             :          * REINDEX on this database or schema per the permission checks at the
    3074             :          * beginning of this routine.
    3075             :          */
    3076        1136 :         if (classtuple->relisshared &&
    3077         132 :             pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
    3078           0 :             !has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
    3079           0 :             continue;
    3080             : 
    3081             :         /*
    3082             :          * Skip system tables, since index_create() would reject indexing them
    3083             :          * concurrently (and it would likely fail if we tried).
    3084             :          */
    3085        1474 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3086         470 :             IsCatalogRelationOid(relid))
    3087             :         {
    3088         384 :             if (!concurrent_warning)
    3089           6 :                 ereport(WARNING,
    3090             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3091             :                          errmsg("cannot reindex system catalogs concurrently, skipping all")));
    3092         384 :             concurrent_warning = true;
    3093         384 :             continue;
    3094             :         }
    3095             : 
    3096             :         /*
    3097             :          * If a new tablespace is set, check if this relation has to be
    3098             :          * skipped.
    3099             :          */
    3100         620 :         if (OidIsValid(params->tablespaceOid))
    3101             :         {
    3102           0 :             bool        skip_rel = false;
    3103             : 
    3104             :             /*
    3105             :              * Mapped relations cannot be moved to different tablespaces (in
    3106             :              * particular this eliminates all shared catalogs.).
    3107             :              */
    3108           0 :             if (RELKIND_HAS_STORAGE(classtuple->relkind) &&
    3109           0 :                 !RelFileNumberIsValid(classtuple->relfilenode))
    3110           0 :                 skip_rel = true;
    3111             : 
    3112             :             /*
    3113             :              * A system relation is always skipped, even with
    3114             :              * allow_system_table_mods enabled.
    3115             :              */
    3116           0 :             if (IsSystemClass(relid, classtuple))
    3117           0 :                 skip_rel = true;
    3118             : 
    3119           0 :             if (skip_rel)
    3120             :             {
    3121           0 :                 if (!tablespace_warning)
    3122           0 :                     ereport(WARNING,
    3123             :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    3124             :                              errmsg("cannot move system relations, skipping all")));
    3125           0 :                 tablespace_warning = true;
    3126           0 :                 continue;
    3127             :             }
    3128             :         }
    3129             : 
    3130             :         /* Save the list of relation OIDs in private context */
    3131         620 :         old = MemoryContextSwitchTo(private_context);
    3132             : 
    3133             :         /*
    3134             :          * We always want to reindex pg_class first if it's selected to be
    3135             :          * reindexed.  This ensures that if there is any corruption in
    3136             :          * pg_class' indexes, they will be fixed before we process any other
    3137             :          * tables.  This is critical because reindexing itself will try to
    3138             :          * update pg_class.
    3139             :          */
    3140         620 :         if (relid == RelationRelationId)
    3141           6 :             relids = lcons_oid(relid, relids);
    3142             :         else
    3143         614 :             relids = lappend_oid(relids, relid);
    3144             : 
    3145         620 :         MemoryContextSwitchTo(old);
    3146             :     }
    3147          72 :     table_endscan(scan);
    3148          72 :     table_close(relationRelation, AccessShareLock);
    3149             : 
    3150             :     /*
    3151             :      * Process each relation listed in a separate transaction.  Note that this
    3152             :      * commits and then starts a new transaction immediately.
    3153             :      */
    3154          72 :     ReindexMultipleInternal(relids, params);
    3155             : 
    3156          72 :     MemoryContextDelete(private_context);
    3157          72 : }
    3158             : 
    3159             : /*
    3160             :  * Error callback specific to ReindexPartitions().
    3161             :  */
    3162             : static void
    3163          12 : reindex_error_callback(void *arg)
    3164             : {
    3165          12 :     ReindexErrorInfo *errinfo = (ReindexErrorInfo *) arg;
    3166             : 
    3167             :     Assert(RELKIND_HAS_PARTITIONS(errinfo->relkind));
    3168             : 
    3169          12 :     if (errinfo->relkind == RELKIND_PARTITIONED_TABLE)
    3170           6 :         errcontext("while reindexing partitioned table \"%s.%s\"",
    3171             :                    errinfo->relnamespace, errinfo->relname);
    3172           6 :     else if (errinfo->relkind == RELKIND_PARTITIONED_INDEX)
    3173           6 :         errcontext("while reindexing partitioned index \"%s.%s\"",
    3174             :                    errinfo->relnamespace, errinfo->relname);
    3175          12 : }
    3176             : 
    3177             : /*
    3178             :  * ReindexPartitions
    3179             :  *
    3180             :  * Reindex a set of partitions, per the partitioned index or table given
    3181             :  * by the caller.
    3182             :  */
    3183             : static void
    3184          58 : ReindexPartitions(Oid relid, ReindexParams *params, bool isTopLevel)
    3185             : {
    3186          58 :     List       *partitions = NIL;
    3187          58 :     char        relkind = get_rel_relkind(relid);
    3188          58 :     char       *relname = get_rel_name(relid);
    3189          58 :     char       *relnamespace = get_namespace_name(get_rel_namespace(relid));
    3190             :     MemoryContext reindex_context;
    3191             :     List       *inhoids;
    3192             :     ListCell   *lc;
    3193             :     ErrorContextCallback errcallback;
    3194             :     ReindexErrorInfo errinfo;
    3195             : 
    3196             :     Assert(RELKIND_HAS_PARTITIONS(relkind));
    3197             : 
    3198             :     /*
    3199             :      * Check if this runs in a transaction block, with an error callback to
    3200             :      * provide more context under which a problem happens.
    3201             :      */
    3202          58 :     errinfo.relname = pstrdup(relname);
    3203          58 :     errinfo.relnamespace = pstrdup(relnamespace);
    3204          58 :     errinfo.relkind = relkind;
    3205          58 :     errcallback.callback = reindex_error_callback;
    3206          58 :     errcallback.arg = (void *) &errinfo;
    3207          58 :     errcallback.previous = error_context_stack;
    3208          58 :     error_context_stack = &errcallback;
    3209             : 
    3210          58 :     PreventInTransactionBlock(isTopLevel,
    3211             :                               relkind == RELKIND_PARTITIONED_TABLE ?
    3212             :                               "REINDEX TABLE" : "REINDEX INDEX");
    3213             : 
    3214             :     /* Pop the error context stack */
    3215          46 :     error_context_stack = errcallback.previous;
    3216             : 
    3217             :     /*
    3218             :      * Create special memory context for cross-transaction storage.
    3219             :      *
    3220             :      * Since it is a child of PortalContext, it will go away eventually even
    3221             :      * if we suffer an error so there is no need for special abort cleanup
    3222             :      * logic.
    3223             :      */
    3224          46 :     reindex_context = AllocSetContextCreate(PortalContext, "Reindex",
    3225             :                                             ALLOCSET_DEFAULT_SIZES);
    3226             : 
    3227             :     /* ShareLock is enough to prevent schema modifications */
    3228          46 :     inhoids = find_all_inheritors(relid, ShareLock, NULL);
    3229             : 
    3230             :     /*
    3231             :      * The list of relations to reindex are the physical partitions of the
    3232             :      * tree so discard any partitioned table or index.
    3233             :      */
    3234         232 :     foreach(lc, inhoids)
    3235             :     {
    3236         186 :         Oid         partoid = lfirst_oid(lc);
    3237         186 :         char        partkind = get_rel_relkind(partoid);
    3238             :         MemoryContext old_context;
    3239             : 
    3240             :         /*
    3241             :          * This discards partitioned tables, partitioned indexes and foreign
    3242             :          * tables.
    3243             :          */
    3244         186 :         if (!RELKIND_HAS_STORAGE(partkind))
    3245         110 :             continue;
    3246             : 
    3247             :         Assert(partkind == RELKIND_INDEX ||
    3248             :                partkind == RELKIND_RELATION);
    3249             : 
    3250             :         /* Save partition OID */
    3251          76 :         old_context = MemoryContextSwitchTo(reindex_context);
    3252          76 :         partitions = lappend_oid(partitions, partoid);
    3253          76 :         MemoryContextSwitchTo(old_context);
    3254             :     }
    3255             : 
    3256             :     /*
    3257             :      * Process each partition listed in a separate transaction.  Note that
    3258             :      * this commits and then starts a new transaction immediately.
    3259             :      */
    3260          46 :     ReindexMultipleInternal(partitions, params);
    3261             : 
    3262             :     /*
    3263             :      * Clean up working storage --- note we must do this after
    3264             :      * StartTransactionCommand, else we might be trying to delete the active
    3265             :      * context!
    3266             :      */
    3267          46 :     MemoryContextDelete(reindex_context);
    3268          46 : }
    3269             : 
    3270             : /*
    3271             :  * ReindexMultipleInternal
    3272             :  *
    3273             :  * Reindex a list of relations, each one being processed in its own
    3274             :  * transaction.  This commits the existing transaction immediately,
    3275             :  * and starts a new transaction when finished.
    3276             :  */
    3277             : static void
    3278         118 : ReindexMultipleInternal(List *relids, ReindexParams *params)
    3279             : {
    3280             :     ListCell   *l;
    3281             : 
    3282         118 :     PopActiveSnapshot();
    3283         118 :     CommitTransactionCommand();
    3284             : 
    3285         814 :     foreach(l, relids)
    3286             :     {
    3287         696 :         Oid         relid = lfirst_oid(l);
    3288             :         char        relkind;
    3289             :         char        relpersistence;
    3290             : 
    3291         696 :         StartTransactionCommand();
    3292             : 
    3293             :         /* functions in indexes may want a snapshot set */
    3294         696 :         PushActiveSnapshot(GetTransactionSnapshot());
    3295             : 
    3296             :         /* check if the relation still exists */
    3297         696 :         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
    3298             :         {
    3299           4 :             PopActiveSnapshot();
    3300           4 :             CommitTransactionCommand();
    3301           4 :             continue;
    3302             :         }
    3303             : 
    3304             :         /*
    3305             :          * Check permissions except when moving to database's default if a new
    3306             :          * tablespace is chosen.  Note that this check also happens in
    3307             :          * ExecReindex(), but we do an extra check here as this runs across
    3308             :          * multiple transactions.
    3309             :          */
    3310         692 :         if (OidIsValid(params->tablespaceOid) &&
    3311          12 :             params->tablespaceOid != MyDatabaseTableSpace)
    3312             :         {
    3313             :             AclResult   aclresult;
    3314             : 
    3315          12 :             aclresult = object_aclcheck(TableSpaceRelationId, params->tablespaceOid,
    3316             :                                         GetUserId(), ACL_CREATE);
    3317          12 :             if (aclresult != ACLCHECK_OK)
    3318           0 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    3319           0 :                                get_tablespace_name(params->tablespaceOid));
    3320             :         }
    3321             : 
    3322         692 :         relkind = get_rel_relkind(relid);
    3323         692 :         relpersistence = get_rel_persistence(relid);
    3324             : 
    3325             :         /*
    3326             :          * Partitioned tables and indexes can never be processed directly, and
    3327             :          * a list of their leaves should be built first.
    3328             :          */
    3329             :         Assert(!RELKIND_HAS_PARTITIONS(relkind));
    3330             : 
    3331         692 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3332             :             relpersistence != RELPERSISTENCE_TEMP)
    3333          92 :         {
    3334          92 :             ReindexParams newparams = *params;
    3335             : 
    3336          92 :             newparams.options |= REINDEXOPT_MISSING_OK;
    3337          92 :             (void) ReindexRelationConcurrently(relid, &newparams);
    3338             :             /* ReindexRelationConcurrently() does the verbose output */
    3339             :         }
    3340         600 :         else if (relkind == RELKIND_INDEX)
    3341             :         {
    3342          12 :             ReindexParams newparams = *params;
    3343             : 
    3344          12 :             newparams.options |=
    3345             :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3346          12 :             reindex_index(relid, false, relpersistence, &newparams);
    3347          12 :             PopActiveSnapshot();
    3348             :             /* reindex_index() does the verbose output */
    3349             :         }
    3350             :         else
    3351             :         {
    3352             :             bool        result;
    3353         588 :             ReindexParams newparams = *params;
    3354             : 
    3355         588 :             newparams.options |=
    3356             :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3357         588 :             result = reindex_relation(relid,
    3358             :                                       REINDEX_REL_PROCESS_TOAST |
    3359             :                                       REINDEX_REL_CHECK_CONSTRAINTS,
    3360             :                                       &newparams);
    3361             : 
    3362         588 :             if (result && (params->options & REINDEXOPT_VERBOSE) != 0)
    3363           0 :                 ereport(INFO,
    3364             :                         (errmsg("table \"%s.%s\" was reindexed",
    3365             :                                 get_namespace_name(get_rel_namespace(relid)),
    3366             :                                 get_rel_name(relid))));
    3367             : 
    3368         588 :             PopActiveSnapshot();
    3369             :         }
    3370             : 
    3371         692 :         CommitTransactionCommand();
    3372             :     }
    3373             : 
    3374         118 :     StartTransactionCommand();
    3375         118 : }
    3376             : 
    3377             : 
    3378             : /*
    3379             :  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
    3380             :  * relation OID
    3381             :  *
    3382             :  * 'relationOid' can either belong to an index, a table or a materialized
    3383             :  * view.  For tables and materialized views, all its indexes will be rebuilt,
    3384             :  * excluding invalid indexes and any indexes used in exclusion constraints,
    3385             :  * but including its associated toast table indexes.  For indexes, the index
    3386             :  * itself will be rebuilt.
    3387             :  *
    3388             :  * The locks taken on parent tables and involved indexes are kept until the
    3389             :  * transaction is committed, at which point a session lock is taken on each
    3390             :  * relation.  Both of these protect against concurrent schema changes.
    3391             :  *
    3392             :  * Returns true if any indexes have been rebuilt (including toast table's
    3393             :  * indexes, when relevant), otherwise returns false.
    3394             :  *
    3395             :  * NOTE: This cannot be used on temporary relations.  A concurrent build would
    3396             :  * cause issues with ON COMMIT actions triggered by the transactions of the
    3397             :  * concurrent build.  Temporary relations are not subject to concurrent
    3398             :  * concerns, so there's no need for the more complicated concurrent build,
    3399             :  * anyway, and a non-concurrent reindex is more efficient.
    3400             :  */
    3401             : static bool
    3402         444 : ReindexRelationConcurrently(Oid relationOid, ReindexParams *params)
    3403             : {
    3404             :     typedef struct ReindexIndexInfo
    3405             :     {
    3406             :         Oid         indexId;
    3407             :         Oid         tableId;
    3408             :         Oid         amId;
    3409             :         bool        safe;       /* for set_indexsafe_procflags */
    3410             :     } ReindexIndexInfo;
    3411         444 :     List       *heapRelationIds = NIL;
    3412         444 :     List       *indexIds = NIL;
    3413         444 :     List       *newIndexIds = NIL;
    3414         444 :     List       *relationLocks = NIL;
    3415         444 :     List       *lockTags = NIL;
    3416             :     ListCell   *lc,
    3417             :                *lc2;
    3418             :     MemoryContext private_context;
    3419             :     MemoryContext oldcontext;
    3420             :     char        relkind;
    3421         444 :     char       *relationName = NULL;
    3422         444 :     char       *relationNamespace = NULL;
    3423             :     PGRUsage    ru0;
    3424         444 :     const int   progress_index[] = {
    3425             :         PROGRESS_CREATEIDX_COMMAND,
    3426             :         PROGRESS_CREATEIDX_PHASE,
    3427             :         PROGRESS_CREATEIDX_INDEX_OID,
    3428             :         PROGRESS_CREATEIDX_ACCESS_METHOD_OID
    3429             :     };
    3430             :     int64       progress_vals[4];
    3431             : 
    3432             :     /*
    3433             :      * Create a memory context that will survive forced transaction commits we
    3434             :      * do below.  Since it is a child of PortalContext, it will go away
    3435             :      * eventually even if we suffer an error; there's no need for special
    3436             :      * abort cleanup logic.
    3437             :      */
    3438         444 :     private_context = AllocSetContextCreate(PortalContext,
    3439             :                                             "ReindexConcurrent",
    3440             :                                             ALLOCSET_SMALL_SIZES);
    3441             : 
    3442         444 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    3443             :     {
    3444             :         /* Save data needed by REINDEX VERBOSE in private context */
    3445           4 :         oldcontext = MemoryContextSwitchTo(private_context);
    3446             : 
    3447           4 :         relationName = get_rel_name(relationOid);
    3448           4 :         relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
    3449             : 
    3450           4 :         pg_rusage_init(&ru0);
    3451             : 
    3452           4 :         MemoryContextSwitchTo(oldcontext);
    3453             :     }
    3454             : 
    3455         444 :     relkind = get_rel_relkind(relationOid);
    3456             : 
    3457             :     /*
    3458             :      * Extract the list of indexes that are going to be rebuilt based on the
    3459             :      * relation Oid given by caller.
    3460             :      */
    3461         444 :     switch (relkind)
    3462             :     {
    3463         284 :         case RELKIND_RELATION:
    3464             :         case RELKIND_MATVIEW:
    3465             :         case RELKIND_TOASTVALUE:
    3466             :             {
    3467             :                 /*
    3468             :                  * In the case of a relation, find all its indexes including
    3469             :                  * toast indexes.
    3470             :                  */
    3471             :                 Relation    heapRelation;
    3472             : 
    3473             :                 /* Save the list of relation OIDs in private context */
    3474         284 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3475             : 
    3476             :                 /* Track this relation for session locks */
    3477         284 :                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
    3478             : 
    3479         284 :                 MemoryContextSwitchTo(oldcontext);
    3480             : 
    3481         284 :                 if (IsCatalogRelationOid(relationOid))
    3482          36 :                     ereport(ERROR,
    3483             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3484             :                              errmsg("cannot reindex system catalogs concurrently")));
    3485             : 
    3486             :                 /* Open relation to get its indexes */
    3487         248 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3488             :                 {
    3489          74 :                     heapRelation = try_table_open(relationOid,
    3490             :                                                   ShareUpdateExclusiveLock);
    3491             :                     /* leave if relation does not exist */
    3492          74 :                     if (!heapRelation)
    3493           0 :                         break;
    3494             :                 }
    3495             :                 else
    3496         174 :                     heapRelation = table_open(relationOid,
    3497             :                                               ShareUpdateExclusiveLock);
    3498             : 
    3499         270 :                 if (OidIsValid(params->tablespaceOid) &&
    3500          22 :                     IsSystemRelation(heapRelation))
    3501           2 :                     ereport(ERROR,
    3502             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3503             :                              errmsg("cannot move system relation \"%s\"",
    3504             :                                     RelationGetRelationName(heapRelation))));
    3505             : 
    3506             :                 /* Add all the valid indexes of relation to list */
    3507         486 :                 foreach(lc, RelationGetIndexList(heapRelation))
    3508             :                 {
    3509         240 :                     Oid         cellOid = lfirst_oid(lc);
    3510         240 :                     Relation    indexRelation = index_open(cellOid,
    3511             :                                                            ShareUpdateExclusiveLock);
    3512             : 
    3513         240 :                     if (!indexRelation->rd_index->indisvalid)
    3514           6 :                         ereport(WARNING,
    3515             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3516             :                                  errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
    3517             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3518             :                                         get_rel_name(cellOid))));
    3519         234 :                     else if (indexRelation->rd_index->indisexclusion)
    3520           6 :                         ereport(WARNING,
    3521             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3522             :                                  errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
    3523             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3524             :                                         get_rel_name(cellOid))));
    3525             :                     else
    3526             :                     {
    3527             :                         ReindexIndexInfo *idx;
    3528             : 
    3529             :                         /* Save the list of relation OIDs in private context */
    3530         228 :                         oldcontext = MemoryContextSwitchTo(private_context);
    3531             : 
    3532         228 :                         idx = palloc_object(ReindexIndexInfo);
    3533         228 :                         idx->indexId = cellOid;
    3534             :                         /* other fields set later */
    3535             : 
    3536         228 :                         indexIds = lappend(indexIds, idx);
    3537             : 
    3538         228 :                         MemoryContextSwitchTo(oldcontext);
    3539             :                     }
    3540             : 
    3541         240 :                     index_close(indexRelation, NoLock);
    3542             :                 }
    3543             : 
    3544             :                 /* Also add the toast indexes */
    3545         246 :                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
    3546             :                 {
    3547          82 :                     Oid         toastOid = heapRelation->rd_rel->reltoastrelid;
    3548          82 :                     Relation    toastRelation = table_open(toastOid,
    3549             :                                                            ShareUpdateExclusiveLock);
    3550             : 
    3551             :                     /* Save the list of relation OIDs in private context */
    3552          82 :                     oldcontext = MemoryContextSwitchTo(private_context);
    3553             : 
    3554             :                     /* Track this relation for session locks */
    3555          82 :                     heapRelationIds = lappend_oid(heapRelationIds, toastOid);
    3556             : 
    3557          82 :                     MemoryContextSwitchTo(oldcontext);
    3558             : 
    3559         164 :                     foreach(lc2, RelationGetIndexList(toastRelation))
    3560             :                     {
    3561          82 :                         Oid         cellOid = lfirst_oid(lc2);
    3562          82 :                         Relation    indexRelation = index_open(cellOid,
    3563             :                                                                ShareUpdateExclusiveLock);
    3564             : 
    3565          82 :                         if (!indexRelation->rd_index->indisvalid)
    3566           0 :                             ereport(WARNING,
    3567             :                                     (errcode(ERRCODE_INDEX_CORRUPTED),
    3568             :                                      errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
    3569             :                                             get_namespace_name(get_rel_namespace(cellOid)),
    3570             :                                             get_rel_name(cellOid))));
    3571             :                         else
    3572             :                         {
    3573             :                             ReindexIndexInfo *idx;
    3574             : 
    3575             :                             /*
    3576             :                              * Save the list of relation OIDs in private
    3577             :                              * context
    3578             :                              */
    3579          82 :                             oldcontext = MemoryContextSwitchTo(private_context);
    3580             : 
    3581          82 :                             idx = palloc_object(ReindexIndexInfo);
    3582          82 :                             idx->indexId = cellOid;
    3583          82 :                             indexIds = lappend(indexIds, idx);
    3584             :                             /* other fields set later */
    3585             : 
    3586          82 :                             MemoryContextSwitchTo(oldcontext);
    3587             :                         }
    3588             : 
    3589          82 :                         index_close(indexRelation, NoLock);
    3590             :                     }
    3591             : 
    3592          82 :                     table_close(toastRelation, NoLock);
    3593             :                 }
    3594             : 
    3595         246 :                 table_close(heapRelation, NoLock);
    3596         246 :                 break;
    3597             :             }
    3598         160 :         case RELKIND_INDEX:
    3599             :             {
    3600         160 :                 Oid         heapId = IndexGetRelation(relationOid,
    3601         160 :                                                       (params->options & REINDEXOPT_MISSING_OK) != 0);
    3602             :                 Relation    heapRelation;
    3603             :                 ReindexIndexInfo *idx;
    3604             : 
    3605             :                 /* if relation is missing, leave */
    3606         160 :                 if (!OidIsValid(heapId))
    3607           0 :                     break;
    3608             : 
    3609         160 :                 if (IsCatalogRelationOid(heapId))
    3610          18 :                     ereport(ERROR,
    3611             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3612             :                              errmsg("cannot reindex system catalogs concurrently")));
    3613             : 
    3614             :                 /*
    3615             :                  * Don't allow reindex for an invalid index on TOAST table, as
    3616             :                  * if rebuilt it would not be possible to drop it.  Match
    3617             :                  * error message in reindex_index().
    3618             :                  */
    3619         142 :                 if (IsToastNamespace(get_rel_namespace(relationOid)) &&
    3620          56 :                     !get_index_isvalid(relationOid))
    3621           0 :                     ereport(ERROR,
    3622             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3623             :                              errmsg("cannot reindex invalid index on TOAST table")));
    3624             : 
    3625             :                 /*
    3626             :                  * Check if parent relation can be locked and if it exists,
    3627             :                  * this needs to be done at this stage as the list of indexes
    3628             :                  * to rebuild is not complete yet, and REINDEXOPT_MISSING_OK
    3629             :                  * should not be used once all the session locks are taken.
    3630             :                  */
    3631         142 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3632             :                 {
    3633          18 :                     heapRelation = try_table_open(heapId,
    3634             :                                                   ShareUpdateExclusiveLock);
    3635             :                     /* leave if relation does not exist */
    3636          18 :                     if (!heapRelation)
    3637           0 :                         break;
    3638             :                 }
    3639             :                 else
    3640         124 :                     heapRelation = table_open(heapId,
    3641             :                                               ShareUpdateExclusiveLock);
    3642             : 
    3643         150 :                 if (OidIsValid(params->tablespaceOid) &&
    3644           8 :                     IsSystemRelation(heapRelation))
    3645           2 :                     ereport(ERROR,
    3646             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3647             :                              errmsg("cannot move system relation \"%s\"",
    3648             :                                     get_rel_name(relationOid))));
    3649             : 
    3650         140 :                 table_close(heapRelation, NoLock);
    3651             : 
    3652             :                 /* Save the list of relation OIDs in private context */
    3653         140 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3654             : 
    3655             :                 /* Track the heap relation of this index for session locks */
    3656         140 :                 heapRelationIds = list_make1_oid(heapId);
    3657             : 
    3658             :                 /*
    3659             :                  * Save the list of relation OIDs in private context.  Note
    3660             :                  * that invalid indexes are allowed here.
    3661             :                  */
    3662         140 :                 idx = palloc_object(ReindexIndexInfo);
    3663         140 :                 idx->indexId = relationOid;
    3664         140 :                 indexIds = lappend(indexIds, idx);
    3665             :                 /* other fields set later */
    3666             : 
    3667         140 :                 MemoryContextSwitchTo(oldcontext);
    3668         140 :                 break;
    3669             :             }
    3670             : 
    3671           0 :         case RELKIND_PARTITIONED_TABLE:
    3672             :         case RELKIND_PARTITIONED_INDEX:
    3673             :         default:
    3674             :             /* Return error if type of relation is not supported */
    3675           0 :             ereport(ERROR,
    3676             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3677             :                      errmsg("cannot reindex this type of relation concurrently")));
    3678             :             break;
    3679             :     }
    3680             : 
    3681             :     /*
    3682             :      * Definitely no indexes, so leave.  Any checks based on
    3683             :      * REINDEXOPT_MISSING_OK should be done only while the list of indexes to
    3684             :      * work on is built as the session locks taken before this transaction
    3685             :      * commits will make sure that they cannot be dropped by a concurrent
    3686             :      * session until this operation completes.
    3687             :      */
    3688         386 :     if (indexIds == NIL)
    3689             :     {
    3690          26 :         PopActiveSnapshot();
    3691          26 :         return false;
    3692             :     }
    3693             : 
    3694             :     /* It's not a shared catalog, so refuse to move it to shared tablespace */
    3695         360 :     if (params->tablespaceOid == GLOBALTABLESPACE_OID)
    3696           6 :         ereport(ERROR,
    3697             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3698             :                  errmsg("cannot move non-shared relation to tablespace \"%s\"",
    3699             :                         get_tablespace_name(params->tablespaceOid))));
    3700             : 
    3701             :     Assert(heapRelationIds != NIL);
    3702             : 
    3703             :     /*-----
    3704             :      * Now we have all the indexes we want to process in indexIds.
    3705             :      *
    3706             :      * The phases now are:
    3707             :      *
    3708             :      * 1. create new indexes in the catalog
    3709             :      * 2. build new indexes
    3710             :      * 3. let new indexes catch up with tuples inserted in the meantime
    3711             :      * 4. swap index names
    3712             :      * 5. mark old indexes as dead
    3713             :      * 6. drop old indexes
    3714             :      *
    3715             :      * We process each phase for all indexes before moving to the next phase,
    3716             :      * for efficiency.
    3717             :      */
    3718             : 
    3719             :     /*
    3720             :      * Phase 1 of REINDEX CONCURRENTLY
    3721             :      *
    3722             :      * Create a new index with the same properties as the old one, but it is
    3723             :      * only registered in catalogs and will be built later.  Then get session
    3724             :      * locks on all involved tables.  See analogous code in DefineIndex() for
    3725             :      * more detailed comments.
    3726             :      */
    3727             : 
    3728         792 :     foreach(lc, indexIds)
    3729             :     {
    3730             :         char       *concurrentName;
    3731         444 :         ReindexIndexInfo *idx = lfirst(lc);
    3732             :         ReindexIndexInfo *newidx;
    3733             :         Oid         newIndexId;
    3734             :         Relation    indexRel;
    3735             :         Relation    heapRel;
    3736             :         Oid         save_userid;
    3737             :         int         save_sec_context;
    3738             :         int         save_nestlevel;
    3739             :         Relation    newIndexRel;
    3740             :         LockRelId  *lockrelid;
    3741             :         Oid         tablespaceid;
    3742             : 
    3743         444 :         indexRel = index_open(idx->indexId, ShareUpdateExclusiveLock);
    3744         444 :         heapRel = table_open(indexRel->rd_index->indrelid,
    3745             :                              ShareUpdateExclusiveLock);
    3746             : 
    3747             :         /*
    3748             :          * Switch to the table owner's userid, so that any index functions are
    3749             :          * run as that user.  Also lock down security-restricted operations
    3750             :          * and arrange to make GUC variable changes local to this command.
    3751             :          */
    3752         444 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    3753         444 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
    3754             :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
    3755         444 :         save_nestlevel = NewGUCNestLevel();
    3756             : 
    3757             :         /* determine safety of this index for set_indexsafe_procflags */
    3758         870 :         idx->safe = (indexRel->rd_indexprs == NIL &&
    3759         426 :                      indexRel->rd_indpred == NIL);
    3760         444 :         idx->tableId = RelationGetRelid(heapRel);
    3761         444 :         idx->amId = indexRel->rd_rel->relam;
    3762             : 
    3763             :         /* This function shouldn't be called for temporary relations. */
    3764         444 :         if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
    3765           0 :             elog(ERROR, "cannot reindex a temporary table concurrently");
    3766             : 
    3767         444 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
    3768             :                                       idx->tableId);
    3769             : 
    3770         444 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3771         444 :         progress_vals[1] = 0;   /* initializing */
    3772         444 :         progress_vals[2] = idx->indexId;
    3773         444 :         progress_vals[3] = idx->amId;
    3774         444 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    3775             : 
    3776             :         /* Choose a temporary relation name for the new index */
    3777         444 :         concurrentName = ChooseRelationName(get_rel_name(idx->indexId),
    3778             :                                             NULL,
    3779             :                                             "ccnew",
    3780         444 :                                             get_rel_namespace(indexRel->rd_index->indrelid),
    3781             :                                             false);
    3782             : 
    3783             :         /* Choose the new tablespace, indexes of toast tables are not moved */
    3784         444 :         if (OidIsValid(params->tablespaceOid) &&
    3785          28 :             heapRel->rd_rel->relkind != RELKIND_TOASTVALUE)
    3786          20 :             tablespaceid = params->tablespaceOid;
    3787             :         else
    3788         424 :             tablespaceid = indexRel->rd_rel->reltablespace;
    3789             : 
    3790             :         /* Create new index definition based on given index */
    3791         444 :         newIndexId = index_concurrently_create_copy(heapRel,
    3792             :                                                     idx->indexId,
    3793             :                                                     tablespaceid,
    3794             :                                                     concurrentName);
    3795             : 
    3796             :         /*
    3797             :          * Now open the relation of the new index, a session-level lock is
    3798             :          * also needed on it.
    3799             :          */
    3800         438 :         newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
    3801             : 
    3802             :         /*
    3803             :          * Save the list of OIDs and locks in private context
    3804             :          */
    3805         438 :         oldcontext = MemoryContextSwitchTo(private_context);
    3806             : 
    3807         438 :         newidx = palloc_object(ReindexIndexInfo);
    3808         438 :         newidx->indexId = newIndexId;
    3809         438 :         newidx->safe = idx->safe;
    3810         438 :         newidx->tableId = idx->tableId;
    3811         438 :         newidx->amId = idx->amId;
    3812             : 
    3813         438 :         newIndexIds = lappend(newIndexIds, newidx);
    3814             : 
    3815             :         /*
    3816             :          * Save lockrelid to protect each relation from drop then close
    3817             :          * relations. The lockrelid on parent relation is not taken here to
    3818             :          * avoid multiple locks taken on the same relation, instead we rely on
    3819             :          * parentRelationIds built earlier.
    3820             :          */
    3821         438 :         lockrelid = palloc_object(LockRelId);
    3822         438 :         *lockrelid = indexRel->rd_lockInfo.lockRelId;
    3823         438 :         relationLocks = lappend(relationLocks, lockrelid);
    3824         438 :         lockrelid = palloc_object(LockRelId);
    3825         438 :         *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
    3826         438 :         relationLocks = lappend(relationLocks, lockrelid);
    3827             : 
    3828         438 :         MemoryContextSwitchTo(oldcontext);
    3829             : 
    3830         438 :         index_close(indexRel, NoLock);
    3831         438 :         index_close(newIndexRel, NoLock);
    3832             : 
    3833             :         /* Roll back any GUC changes executed by index functions */
    3834         438 :         AtEOXact_GUC(false, save_nestlevel);
    3835             : 
    3836             :         /* Restore userid and security context */
    3837         438 :         SetUserIdAndSecContext(save_userid, save_sec_context);
    3838             : 
    3839         438 :         table_close(heapRel, NoLock);
    3840             :     }
    3841             : 
    3842             :     /*
    3843             :      * Save the heap lock for following visibility checks with other backends
    3844             :      * might conflict with this session.
    3845             :      */
    3846         778 :     foreach(lc, heapRelationIds)
    3847             :     {
    3848         430 :         Relation    heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
    3849             :         LockRelId  *lockrelid;
    3850             :         LOCKTAG    *heaplocktag;
    3851             : 
    3852             :         /* Save the list of locks in private context */
    3853         430 :         oldcontext = MemoryContextSwitchTo(private_context);
    3854             : 
    3855             :         /* Add lockrelid of heap relation to the list of locked relations */
    3856         430 :         lockrelid = palloc_object(LockRelId);
    3857         430 :         *lockrelid = heapRelation->rd_lockInfo.lockRelId;
    3858         430 :         relationLocks = lappend(relationLocks, lockrelid);
    3859             : 
    3860         430 :         heaplocktag = palloc_object(LOCKTAG);
    3861             : 
    3862             :         /* Save the LOCKTAG for this parent relation for the wait phase */
    3863         430 :         SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
    3864         430 :         lockTags = lappend(lockTags, heaplocktag);
    3865             : 
    3866         430 :         MemoryContextSwitchTo(oldcontext);
    3867             : 
    3868             :         /* Close heap relation */
    3869         430 :         table_close(heapRelation, NoLock);
    3870             :     }
    3871             : 
    3872             :     /* Get a session-level lock on each table. */
    3873        1654 :     foreach(lc, relationLocks)
    3874             :     {
    3875        1306 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    3876             : 
    3877        1306 :         LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    3878             :     }
    3879             : 
    3880         348 :     PopActiveSnapshot();
    3881         348 :     CommitTransactionCommand();
    3882         348 :     StartTransactionCommand();
    3883             : 
    3884             :     /*
    3885             :      * Because we don't take a snapshot in this transaction, there's no need
    3886             :      * to set the PROC_IN_SAFE_IC flag here.
    3887             :      */
    3888             : 
    3889             :     /*
    3890             :      * Phase 2 of REINDEX CONCURRENTLY
    3891             :      *
    3892             :      * Build the new indexes in a separate transaction for each index to avoid
    3893             :      * having open transactions for an unnecessary long time.  But before
    3894             :      * doing that, wait until no running transactions could have the table of
    3895             :      * the index open with the old list of indexes.  See "phase 2" in
    3896             :      * DefineIndex() for more details.
    3897             :      */
    3898             : 
    3899         348 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3900             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    3901         348 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    3902         348 :     CommitTransactionCommand();
    3903             : 
    3904         780 :     foreach(lc, newIndexIds)
    3905             :     {
    3906         438 :         ReindexIndexInfo *newidx = lfirst(lc);
    3907             : 
    3908             :         /* Start new transaction for this index's concurrent build */
    3909         438 :         StartTransactionCommand();
    3910             : 
    3911             :         /*
    3912             :          * Check for user-requested abort.  This is inside a transaction so as
    3913             :          * xact.c does not issue a useless WARNING, and ensures that
    3914             :          * session-level locks are cleaned up on abort.
    3915             :          */
    3916         438 :         CHECK_FOR_INTERRUPTS();
    3917             : 
    3918             :         /* Tell concurrent indexing to ignore us, if index qualifies */
    3919         438 :         if (newidx->safe)
    3920         414 :             set_indexsafe_procflags();
    3921             : 
    3922             :         /* Set ActiveSnapshot since functions in the indexes may need it */
    3923         438 :         PushActiveSnapshot(GetTransactionSnapshot());
    3924             : 
    3925             :         /*
    3926             :          * Update progress for the index to build, with the correct parent
    3927             :          * table involved.
    3928             :          */
    3929         438 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    3930         438 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3931         438 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
    3932         438 :         progress_vals[2] = newidx->indexId;
    3933         438 :         progress_vals[3] = newidx->amId;
    3934         438 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    3935             : 
    3936             :         /* Perform concurrent build of new index */
    3937         438 :         index_concurrently_build(newidx->tableId, newidx->indexId);
    3938             : 
    3939         432 :         PopActiveSnapshot();
    3940         432 :         CommitTransactionCommand();
    3941             :     }
    3942             : 
    3943         342 :     StartTransactionCommand();
    3944             : 
    3945             :     /*
    3946             :      * Because we don't take a snapshot or Xid in this transaction, there's no
    3947             :      * need to set the PROC_IN_SAFE_IC flag here.
    3948             :      */
    3949             : 
    3950             :     /*
    3951             :      * Phase 3 of REINDEX CONCURRENTLY
    3952             :      *
    3953             :      * During this phase the old indexes catch up with any new tuples that
    3954             :      * were created during the previous phase.  See "phase 3" in DefineIndex()
    3955             :      * for more details.
    3956             :      */
    3957             : 
    3958         342 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    3959             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    3960         342 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    3961         342 :     CommitTransactionCommand();
    3962             : 
    3963         774 :     foreach(lc, newIndexIds)
    3964             :     {
    3965         432 :         ReindexIndexInfo *newidx = lfirst(lc);
    3966             :         TransactionId limitXmin;
    3967             :         Snapshot    snapshot;
    3968             : 
    3969         432 :         StartTransactionCommand();
    3970             : 
    3971             :         /*
    3972             :          * Check for user-requested abort.  This is inside a transaction so as
    3973             :          * xact.c does not issue a useless WARNING, and ensures that
    3974             :          * session-level locks are cleaned up on abort.
    3975             :          */
    3976         432 :         CHECK_FOR_INTERRUPTS();
    3977             : 
    3978             :         /* Tell concurrent indexing to ignore us, if index qualifies */
    3979         432 :         if (newidx->safe)
    3980         408 :             set_indexsafe_procflags();
    3981             : 
    3982             :         /*
    3983             :          * Take the "reference snapshot" that will be used by validate_index()
    3984             :          * to filter candidate tuples.
    3985             :          */
    3986         432 :         snapshot = RegisterSnapshot(GetTransactionSnapshot());
    3987         432 :         PushActiveSnapshot(snapshot);
    3988             : 
    3989             :         /*
    3990             :          * Update progress for the index to build, with the correct parent
    3991             :          * table involved.
    3992             :          */
    3993         432 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
    3994             :                                       newidx->tableId);
    3995         432 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3996         432 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
    3997         432 :         progress_vals[2] = newidx->indexId;
    3998         432 :         progress_vals[3] = newidx->amId;
    3999         432 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4000             : 
    4001         432 :         validate_index(newidx->tableId, newidx->indexId, snapshot);
    4002             : 
    4003             :         /*
    4004             :          * We can now do away with our active snapshot, we still need to save
    4005             :          * the xmin limit to wait for older snapshots.
    4006             :          */
    4007         432 :         limitXmin = snapshot->xmin;
    4008             : 
    4009         432 :         PopActiveSnapshot();
    4010         432 :         UnregisterSnapshot(snapshot);
    4011             : 
    4012             :         /*
    4013             :          * To ensure no deadlocks, we must commit and start yet another
    4014             :          * transaction, and do our wait before any snapshot has been taken in
    4015             :          * it.
    4016             :          */
    4017         432 :         CommitTransactionCommand();
    4018         432 :         StartTransactionCommand();
    4019             : 
    4020             :         /*
    4021             :          * The index is now valid in the sense that it contains all currently
    4022             :          * interesting tuples.  But since it might not contain tuples deleted
    4023             :          * just before the reference snap was taken, we have to wait out any
    4024             :          * transactions that might have older snapshots.
    4025             :          *
    4026             :          * Because we don't take a snapshot or Xid in this transaction,
    4027             :          * there's no need to set the PROC_IN_SAFE_IC flag here.
    4028             :          */
    4029         432 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4030             :                                      PROGRESS_CREATEIDX_PHASE_WAIT_3);
    4031         432 :         WaitForOlderSnapshots(limitXmin, true);
    4032             : 
    4033         432 :         CommitTransactionCommand();
    4034             :     }
    4035             : 
    4036             :     /*
    4037             :      * Phase 4 of REINDEX CONCURRENTLY
    4038             :      *
    4039             :      * Now that the new indexes have been validated, swap each new index with
    4040             :      * its corresponding old index.
    4041             :      *
    4042             :      * We mark the new indexes as valid and the old indexes as not valid at
    4043             :      * the same time to make sure we only get constraint violations from the
    4044             :      * indexes with the correct names.
    4045             :      */
    4046             : 
    4047         342 :     StartTransactionCommand();
    4048             : 
    4049             :     /*
    4050             :      * Because this transaction only does catalog manipulations and doesn't do
    4051             :      * any index operations, we can set the PROC_IN_SAFE_IC flag here
    4052             :      * unconditionally.
    4053             :      */
    4054         342 :     set_indexsafe_procflags();
    4055             : 
    4056         774 :     forboth(lc, indexIds, lc2, newIndexIds)
    4057             :     {
    4058         432 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4059         432 :         ReindexIndexInfo *newidx = lfirst(lc2);
    4060             :         char       *oldName;
    4061             : 
    4062             :         /*
    4063             :          * Check for user-requested abort.  This is inside a transaction so as
    4064             :          * xact.c does not issue a useless WARNING, and ensures that
    4065             :          * session-level locks are cleaned up on abort.
    4066             :          */
    4067         432 :         CHECK_FOR_INTERRUPTS();
    4068             : 
    4069             :         /* Choose a relation name for old index */
    4070         432 :         oldName = ChooseRelationName(get_rel_name(oldidx->indexId),
    4071             :                                      NULL,
    4072             :                                      "ccold",
    4073             :                                      get_rel_namespace(oldidx->tableId),
    4074             :                                      false);
    4075             : 
    4076             :         /*
    4077             :          * Swap old index with the new one.  This also marks the new one as
    4078             :          * valid and the old one as not valid.
    4079             :          */
    4080         432 :         index_concurrently_swap(newidx->indexId, oldidx->indexId, oldName);
    4081             : 
    4082             :         /*
    4083             :          * Invalidate the relcache for the table, so that after this commit
    4084             :          * all sessions will refresh any cached plans that might reference the
    4085             :          * index.
    4086             :          */
    4087         432 :         CacheInvalidateRelcacheByRelid(oldidx->tableId);
    4088             : 
    4089             :         /*
    4090             :          * CCI here so that subsequent iterations see the oldName in the
    4091             :          * catalog and can choose a nonconflicting name for their oldName.
    4092             :          * Otherwise, this could lead to conflicts if a table has two indexes
    4093             :          * whose names are equal for the first NAMEDATALEN-minus-a-few
    4094             :          * characters.
    4095             :          */
    4096         432 :         CommandCounterIncrement();
    4097             :     }
    4098             : 
    4099             :     /* Commit this transaction and make index swaps visible */
    4100         342 :     CommitTransactionCommand();
    4101         342 :     StartTransactionCommand();
    4102             : 
    4103             :     /*
    4104             :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4105             :      * real need for that, because we only acquire an Xid after the wait is
    4106             :      * done, and that lasts for a very short period.
    4107             :      */
    4108             : 
    4109             :     /*
    4110             :      * Phase 5 of REINDEX CONCURRENTLY
    4111             :      *
    4112             :      * Mark the old indexes as dead.  First we must wait until no running
    4113             :      * transaction could be using the index for a query.  See also
    4114             :      * index_drop() for more details.
    4115             :      */
    4116             : 
    4117         342 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4118             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    4119         342 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4120             : 
    4121         774 :     foreach(lc, indexIds)
    4122             :     {
    4123         432 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4124             : 
    4125             :         /*
    4126             :          * Check for user-requested abort.  This is inside a transaction so as
    4127             :          * xact.c does not issue a useless WARNING, and ensures that
    4128             :          * session-level locks are cleaned up on abort.
    4129             :          */
    4130         432 :         CHECK_FOR_INTERRUPTS();
    4131             : 
    4132         432 :         index_concurrently_set_dead(oldidx->tableId, oldidx->indexId);
    4133             :     }
    4134             : 
    4135             :     /* Commit this transaction to make the updates visible. */
    4136         342 :     CommitTransactionCommand();
    4137         342 :     StartTransactionCommand();
    4138             : 
    4139             :     /*
    4140             :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4141             :      * real need for that, because we only acquire an Xid after the wait is
    4142             :      * done, and that lasts for a very short period.
    4143             :      */
    4144             : 
    4145             :     /*
    4146             :      * Phase 6 of REINDEX CONCURRENTLY
    4147             :      *
    4148             :      * Drop the old indexes.
    4149             :      */
    4150             : 
    4151         342 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4152             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_5);
    4153         342 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4154             : 
    4155         342 :     PushActiveSnapshot(GetTransactionSnapshot());
    4156             : 
    4157             :     {
    4158         342 :         ObjectAddresses *objects = new_object_addresses();
    4159             : 
    4160         774 :         foreach(lc, indexIds)
    4161             :         {
    4162         432 :             ReindexIndexInfo *idx = lfirst(lc);
    4163             :             ObjectAddress object;
    4164             : 
    4165         432 :             object.classId = RelationRelationId;
    4166         432 :             object.objectId = idx->indexId;
    4167         432 :             object.objectSubId = 0;
    4168             : 
    4169         432 :             add_exact_object_address(&object, objects);
    4170             :         }
    4171             : 
    4172             :         /*
    4173             :          * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
    4174             :          * right lock level.
    4175             :          */
    4176         342 :         performMultipleDeletions(objects, DROP_RESTRICT,
    4177             :                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
    4178             :     }
    4179             : 
    4180         342 :     PopActiveSnapshot();
    4181         342 :     CommitTransactionCommand();
    4182             : 
    4183             :     /*
    4184             :      * Finally, release the session-level lock on the table.
    4185             :      */
    4186        1630 :     foreach(lc, relationLocks)
    4187             :     {
    4188        1288 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4189             : 
    4190        1288 :         UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4191             :     }
    4192             : 
    4193             :     /* Start a new transaction to finish process properly */
    4194         342 :     StartTransactionCommand();
    4195             : 
    4196             :     /* Log what we did */
    4197         342 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    4198             :     {
    4199           4 :         if (relkind == RELKIND_INDEX)
    4200           0 :             ereport(INFO,
    4201             :                     (errmsg("index \"%s.%s\" was reindexed",
    4202             :                             relationNamespace, relationName),
    4203             :                      errdetail("%s.",
    4204             :                                pg_rusage_show(&ru0))));
    4205             :         else
    4206             :         {
    4207          12 :             foreach(lc, newIndexIds)
    4208             :             {
    4209           8 :                 ReindexIndexInfo *idx = lfirst(lc);
    4210           8 :                 Oid         indOid = idx->indexId;
    4211             : 
    4212           8 :                 ereport(INFO,
    4213             :                         (errmsg("index \"%s.%s\" was reindexed",
    4214             :                                 get_namespace_name(get_rel_namespace(indOid)),
    4215             :                                 get_rel_name(indOid))));
    4216             :                 /* Don't show rusage here, since it's not per index. */
    4217             :             }
    4218             : 
    4219           4 :             ereport(INFO,
    4220             :                     (errmsg("table \"%s.%s\" was reindexed",
    4221             :                             relationNamespace, relationName),
    4222             :                      errdetail("%s.",
    4223             :                                pg_rusage_show(&ru0))));
    4224             :         }
    4225             :     }
    4226             : 
    4227         342 :     MemoryContextDelete(private_context);
    4228             : 
    4229         342 :     pgstat_progress_end_command();
    4230             : 
    4231         342 :     return true;
    4232             : }
    4233             : 
    4234             : /*
    4235             :  * Insert or delete an appropriate pg_inherits tuple to make the given index
    4236             :  * be a partition of the indicated parent index.
    4237             :  *
    4238             :  * This also corrects the pg_depend information for the affected index.
    4239             :  */
    4240             : void
    4241         624 : IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
    4242             : {
    4243             :     Relation    pg_inherits;
    4244             :     ScanKeyData key[2];
    4245             :     SysScanDesc scan;
    4246         624 :     Oid         partRelid = RelationGetRelid(partitionIdx);
    4247             :     HeapTuple   tuple;
    4248             :     bool        fix_dependencies;
    4249             : 
    4250             :     /* Make sure this is an index */
    4251             :     Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
    4252             :            partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
    4253             : 
    4254             :     /*
    4255             :      * Scan pg_inherits for rows linking our index to some parent.
    4256             :      */
    4257         624 :     pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
    4258         624 :     ScanKeyInit(&key[0],
    4259             :                 Anum_pg_inherits_inhrelid,
    4260             :                 BTEqualStrategyNumber, F_OIDEQ,
    4261             :                 ObjectIdGetDatum(partRelid));
    4262         624 :     ScanKeyInit(&key[1],
    4263             :                 Anum_pg_inherits_inhseqno,
    4264             :                 BTEqualStrategyNumber, F_INT4EQ,
    4265             :                 Int32GetDatum(1));
    4266         624 :     scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
    4267             :                               NULL, 2, key);
    4268         624 :     tuple = systable_getnext(scan);
    4269             : 
    4270         624 :     if (!HeapTupleIsValid(tuple))
    4271             :     {
    4272         496 :         if (parentOid == InvalidOid)
    4273             :         {
    4274             :             /*
    4275             :              * No pg_inherits row, and no parent wanted: nothing to do in this
    4276             :              * case.
    4277             :              */
    4278           0 :             fix_dependencies = false;
    4279             :         }
    4280             :         else
    4281             :         {
    4282         496 :             StoreSingleInheritance(partRelid, parentOid, 1);
    4283         496 :             fix_dependencies = true;
    4284             :         }
    4285             :     }
    4286             :     else
    4287             :     {
    4288         128 :         Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
    4289             : 
    4290         128 :         if (parentOid == InvalidOid)
    4291             :         {
    4292             :             /*
    4293             :              * There exists a pg_inherits row, which we want to clear; do so.
    4294             :              */
    4295         128 :             CatalogTupleDelete(pg_inherits, &tuple->t_self);
    4296         128 :             fix_dependencies = true;
    4297             :         }
    4298             :         else
    4299             :         {
    4300             :             /*
    4301             :              * A pg_inherits row exists.  If it's the same we want, then we're
    4302             :              * good; if it differs, that amounts to a corrupt catalog and
    4303             :              * should not happen.
    4304             :              */
    4305           0 :             if (inhForm->inhparent != parentOid)
    4306             :             {
    4307             :                 /* unexpected: we should not get called in this case */
    4308           0 :                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
    4309             :                      inhForm->inhrelid, inhForm->inhparent);
    4310             :             }
    4311             : 
    4312             :             /* already in the right state */
    4313           0 :             fix_dependencies = false;
    4314             :         }
    4315             :     }
    4316             : 
    4317             :     /* done with pg_inherits */
    4318         624 :     systable_endscan(scan);
    4319         624 :     relation_close(pg_inherits, RowExclusiveLock);
    4320             : 
    4321             :     /* set relhassubclass if an index partition has been added to the parent */
    4322         624 :     if (OidIsValid(parentOid))
    4323         496 :         SetRelationHasSubclass(parentOid, true);
    4324             : 
    4325             :     /* set relispartition correctly on the partition */
    4326         624 :     update_relispartition(partRelid, OidIsValid(parentOid));
    4327             : 
    4328         624 :     if (fix_dependencies)
    4329             :     {
    4330             :         /*
    4331             :          * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
    4332             :          * dependencies on the parent index and the table; if removing a
    4333             :          * parent, delete PARTITION dependencies.
    4334             :          */
    4335         624 :         if (OidIsValid(parentOid))
    4336             :         {
    4337             :             ObjectAddress partIdx;
    4338             :             ObjectAddress parentIdx;
    4339             :             ObjectAddress partitionTbl;
    4340             : 
    4341         496 :             ObjectAddressSet(partIdx, RelationRelationId, partRelid);
    4342         496 :             ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
    4343         496 :             ObjectAddressSet(partitionTbl, RelationRelationId,
    4344             :                              partitionIdx->rd_index->indrelid);
    4345         496 :             recordDependencyOn(&partIdx, &parentIdx,
    4346             :                                DEPENDENCY_PARTITION_PRI);
    4347         496 :             recordDependencyOn(&partIdx, &partitionTbl,
    4348             :                                DEPENDENCY_PARTITION_SEC);
    4349             :         }
    4350             :         else
    4351             :         {
    4352         128 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4353             :                                             RelationRelationId,
    4354             :                                             DEPENDENCY_PARTITION_PRI);
    4355         128 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4356             :                                             RelationRelationId,
    4357             :                                             DEPENDENCY_PARTITION_SEC);
    4358             :         }
    4359             : 
    4360             :         /* make our updates visible */
    4361         624 :         CommandCounterIncrement();
    4362             :     }
    4363         624 : }
    4364             : 
    4365             : /*
    4366             :  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
    4367             :  * given index to the given value.
    4368             :  */
    4369             : static void
    4370         624 : update_relispartition(Oid relationId, bool newval)
    4371             : {
    4372             :     HeapTuple   tup;
    4373             :     Relation    classRel;
    4374             : 
    4375         624 :     classRel = table_open(RelationRelationId, RowExclusiveLock);
    4376         624 :     tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
    4377         624 :     if (!HeapTupleIsValid(tup))
    4378           0 :         elog(ERROR, "cache lookup failed for relation %u", relationId);
    4379             :     Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    4380         624 :     ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
    4381         624 :     CatalogTupleUpdate(classRel, &tup->t_self, tup);
    4382         624 :     heap_freetuple(tup);
    4383         624 :     table_close(classRel, RowExclusiveLock);
    4384         624 : }
    4385             : 
    4386             : /*
    4387             :  * Set the PROC_IN_SAFE_IC flag in MyProc->statusFlags.
    4388             :  *
    4389             :  * When doing concurrent index builds, we can set this flag
    4390             :  * to tell other processes concurrently running CREATE
    4391             :  * INDEX CONCURRENTLY or REINDEX CONCURRENTLY to ignore us when
    4392             :  * doing their waits for concurrent snapshots.  On one hand it
    4393             :  * avoids pointlessly waiting for a process that's not interesting
    4394             :  * anyway; but more importantly it avoids deadlocks in some cases.
    4395             :  *
    4396             :  * This can be done safely only for indexes that don't execute any
    4397             :  * expressions that could access other tables, so index must not be
    4398             :  * expressional nor partial.  Caller is responsible for only calling
    4399             :  * this routine when that assumption holds true.
    4400             :  *
    4401             :  * (The flag is reset automatically at transaction end, so it must be
    4402             :  * set for each transaction.)
    4403             :  */
    4404             : static inline void
    4405        1440 : set_indexsafe_procflags(void)
    4406             : {
    4407             :     /*
    4408             :      * This should only be called before installing xid or xmin in MyProc;
    4409             :      * otherwise, concurrent processes could see an Xmin that moves backwards.
    4410             :      */
    4411             :     Assert(MyProc->xid == InvalidTransactionId &&
    4412             :            MyProc->xmin == InvalidTransactionId);
    4413             : 
    4414        1440 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    4415        1440 :     MyProc->statusFlags |= PROC_IN_SAFE_IC;
    4416        1440 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
    4417        1440 :     LWLockRelease(ProcArrayLock);
    4418        1440 : }

Generated by: LCOV version 1.14