LCOV - code coverage report
Current view: top level - src/backend/commands - indexcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 1246 1345 92.6 %
Date: 2026-01-31 03:18:31 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * indexcmds.c
       4             :  *    POSTGRES define and remove index code.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/commands/indexcmds.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/amapi.h"
      19             : #include "access/gist.h"
      20             : #include "access/heapam.h"
      21             : #include "access/htup_details.h"
      22             : #include "access/reloptions.h"
      23             : #include "access/sysattr.h"
      24             : #include "access/tableam.h"
      25             : #include "access/xact.h"
      26             : #include "catalog/catalog.h"
      27             : #include "catalog/index.h"
      28             : #include "catalog/indexing.h"
      29             : #include "catalog/namespace.h"
      30             : #include "catalog/pg_am.h"
      31             : #include "catalog/pg_authid.h"
      32             : #include "catalog/pg_collation.h"
      33             : #include "catalog/pg_constraint.h"
      34             : #include "catalog/pg_database.h"
      35             : #include "catalog/pg_inherits.h"
      36             : #include "catalog/pg_namespace.h"
      37             : #include "catalog/pg_opclass.h"
      38             : #include "catalog/pg_tablespace.h"
      39             : #include "catalog/pg_type.h"
      40             : #include "commands/comment.h"
      41             : #include "commands/defrem.h"
      42             : #include "commands/event_trigger.h"
      43             : #include "commands/progress.h"
      44             : #include "commands/tablecmds.h"
      45             : #include "commands/tablespace.h"
      46             : #include "mb/pg_wchar.h"
      47             : #include "miscadmin.h"
      48             : #include "nodes/makefuncs.h"
      49             : #include "nodes/nodeFuncs.h"
      50             : #include "optimizer/optimizer.h"
      51             : #include "parser/parse_coerce.h"
      52             : #include "parser/parse_oper.h"
      53             : #include "parser/parse_utilcmd.h"
      54             : #include "partitioning/partdesc.h"
      55             : #include "pgstat.h"
      56             : #include "rewrite/rewriteManip.h"
      57             : #include "storage/lmgr.h"
      58             : #include "storage/proc.h"
      59             : #include "storage/procarray.h"
      60             : #include "utils/acl.h"
      61             : #include "utils/builtins.h"
      62             : #include "utils/fmgroids.h"
      63             : #include "utils/guc.h"
      64             : #include "utils/injection_point.h"
      65             : #include "utils/inval.h"
      66             : #include "utils/lsyscache.h"
      67             : #include "utils/memutils.h"
      68             : #include "utils/partcache.h"
      69             : #include "utils/pg_rusage.h"
      70             : #include "utils/regproc.h"
      71             : #include "utils/snapmgr.h"
      72             : #include "utils/syscache.h"
      73             : 
      74             : 
      75             : /* non-export function prototypes */
      76             : static bool CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts);
      77             : static void CheckPredicate(Expr *predicate);
      78             : static void ComputeIndexAttrs(ParseState *pstate,
      79             :                               IndexInfo *indexInfo,
      80             :                               Oid *typeOids,
      81             :                               Oid *collationOids,
      82             :                               Oid *opclassOids,
      83             :                               Datum *opclassOptions,
      84             :                               int16 *colOptions,
      85             :                               const List *attList,
      86             :                               const List *exclusionOpNames,
      87             :                               Oid relId,
      88             :                               const char *accessMethodName,
      89             :                               Oid accessMethodId,
      90             :                               bool amcanorder,
      91             :                               bool isconstraint,
      92             :                               bool iswithoutoverlaps,
      93             :                               Oid ddl_userid,
      94             :                               int ddl_sec_context,
      95             :                               int *ddl_save_nestlevel);
      96             : static char *ChooseIndexName(const char *tabname, Oid namespaceId,
      97             :                              const List *colnames, const List *exclusionOpNames,
      98             :                              bool primary, bool isconstraint);
      99             : static char *ChooseIndexNameAddition(const List *colnames);
     100             : static List *ChooseIndexColumnNames(const List *indexElems);
     101             : static void ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params,
     102             :                          bool isTopLevel);
     103             : static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
     104             :                                             Oid relId, Oid oldRelId, void *arg);
     105             : static Oid  ReindexTable(const ReindexStmt *stmt, const ReindexParams *params,
     106             :                          bool isTopLevel);
     107             : static void ReindexMultipleTables(const ReindexStmt *stmt,
     108             :                                   const ReindexParams *params);
     109             : static void reindex_error_callback(void *arg);
     110             : static void ReindexPartitions(const ReindexStmt *stmt, Oid relid,
     111             :                               const ReindexParams *params, bool isTopLevel);
     112             : static void ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids,
     113             :                                     const ReindexParams *params);
     114             : static bool ReindexRelationConcurrently(const ReindexStmt *stmt,
     115             :                                         Oid relationOid,
     116             :                                         const ReindexParams *params);
     117             : static void update_relispartition(Oid relationId, bool newval);
     118             : static inline void set_indexsafe_procflags(void);
     119             : 
     120             : /*
     121             :  * callback argument type for RangeVarCallbackForReindexIndex()
     122             :  */
     123             : struct ReindexIndexCallbackState
     124             : {
     125             :     ReindexParams params;       /* options from statement */
     126             :     Oid         locked_table_oid;   /* tracks previously locked table */
     127             : };
     128             : 
     129             : /*
     130             :  * callback arguments for reindex_error_callback()
     131             :  */
     132             : typedef struct ReindexErrorInfo
     133             : {
     134             :     char       *relname;
     135             :     char       *relnamespace;
     136             :     char        relkind;
     137             : } ReindexErrorInfo;
     138             : 
     139             : /*
     140             :  * CheckIndexCompatible
     141             :  *      Determine whether an existing index definition is compatible with a
     142             :  *      prospective index definition, such that the existing index storage
     143             :  *      could become the storage of the new index, avoiding a rebuild.
     144             :  *
     145             :  * 'oldId': the OID of the existing index
     146             :  * 'accessMethodName': name of the AM to use.
     147             :  * 'attributeList': a list of IndexElem specifying columns and expressions
     148             :  *      to index on.
     149             :  * 'exclusionOpNames': list of names of exclusion-constraint operators,
     150             :  *      or NIL if not an exclusion constraint.
     151             :  * 'isWithoutOverlaps': true iff this index has a WITHOUT OVERLAPS clause.
     152             :  *
     153             :  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
     154             :  * any indexes that depended on a changing column from their pg_get_indexdef
     155             :  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
     156             :  * DefineIndex.  We assume that the old and new indexes have the same number
     157             :  * of columns and that if one has an expression column or predicate, both do.
     158             :  * Errors arising from the attribute list still apply.
     159             :  *
     160             :  * Most column type changes that can skip a table rewrite do not invalidate
     161             :  * indexes.  We acknowledge this when all operator classes, collations and
     162             :  * exclusion operators match.  Though we could further permit intra-opfamily
     163             :  * changes for btree and hash indexes, that adds subtle complexity with no
     164             :  * concrete benefit for core types. Note, that INCLUDE columns aren't
     165             :  * checked by this function, for them it's enough that table rewrite is
     166             :  * skipped.
     167             :  *
     168             :  * When a comparison or exclusion operator has a polymorphic input type, the
     169             :  * actual input types must also match.  This defends against the possibility
     170             :  * that operators could vary behavior in response to get_fn_expr_argtype().
     171             :  * At present, this hazard is theoretical: check_exclusion_constraint() and
     172             :  * all core index access methods decline to set fn_expr for such calls.
     173             :  *
     174             :  * We do not yet implement a test to verify compatibility of expression
     175             :  * columns or predicates, so assume any such index is incompatible.
     176             :  */
     177             : bool
     178         104 : CheckIndexCompatible(Oid oldId,
     179             :                      const char *accessMethodName,
     180             :                      const List *attributeList,
     181             :                      const List *exclusionOpNames,
     182             :                      bool isWithoutOverlaps)
     183             : {
     184             :     bool        isconstraint;
     185             :     Oid        *typeIds;
     186             :     Oid        *collationIds;
     187             :     Oid        *opclassIds;
     188             :     Datum      *opclassOptions;
     189             :     Oid         accessMethodId;
     190             :     Oid         relationId;
     191             :     HeapTuple   tuple;
     192             :     Form_pg_index indexForm;
     193             :     Form_pg_am  accessMethodForm;
     194             :     const IndexAmRoutine *amRoutine;
     195             :     bool        amcanorder;
     196             :     bool        amsummarizing;
     197             :     int16      *coloptions;
     198             :     IndexInfo  *indexInfo;
     199             :     int         numberOfAttributes;
     200             :     int         old_natts;
     201         104 :     bool        ret = true;
     202             :     oidvector  *old_indclass;
     203             :     oidvector  *old_indcollation;
     204             :     Relation    irel;
     205             :     int         i;
     206             :     Datum       d;
     207             : 
     208             :     /* Caller should already have the relation locked in some way. */
     209         104 :     relationId = IndexGetRelation(oldId, false);
     210             : 
     211             :     /*
     212             :      * We can pretend isconstraint = false unconditionally.  It only serves to
     213             :      * decide the text of an error message that should never happen for us.
     214             :      */
     215         104 :     isconstraint = false;
     216             : 
     217         104 :     numberOfAttributes = list_length(attributeList);
     218             :     Assert(numberOfAttributes > 0);
     219             :     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
     220             : 
     221             :     /* look up the access method */
     222         104 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     223         104 :     if (!HeapTupleIsValid(tuple))
     224           0 :         ereport(ERROR,
     225             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     226             :                  errmsg("access method \"%s\" does not exist",
     227             :                         accessMethodName)));
     228         104 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     229         104 :     accessMethodId = accessMethodForm->oid;
     230         104 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     231         104 :     ReleaseSysCache(tuple);
     232             : 
     233         104 :     amcanorder = amRoutine->amcanorder;
     234         104 :     amsummarizing = amRoutine->amsummarizing;
     235             : 
     236             :     /*
     237             :      * Compute the operator classes, collations, and exclusion operators for
     238             :      * the new index, so we can test whether it's compatible with the existing
     239             :      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
     240             :      * DefineIndex would have failed later.  Our attributeList contains only
     241             :      * key attributes, thus we're filling ii_NumIndexAttrs and
     242             :      * ii_NumIndexKeyAttrs with same value.
     243             :      */
     244         104 :     indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
     245             :                               accessMethodId, NIL, NIL, false, false,
     246             :                               false, false, amsummarizing, isWithoutOverlaps);
     247         104 :     typeIds = palloc_array(Oid, numberOfAttributes);
     248         104 :     collationIds = palloc_array(Oid, numberOfAttributes);
     249         104 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     250         104 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     251         104 :     coloptions = palloc_array(int16, numberOfAttributes);
     252         104 :     ComputeIndexAttrs(NULL, indexInfo,
     253             :                       typeIds, collationIds, opclassIds, opclassOptions,
     254             :                       coloptions, attributeList,
     255             :                       exclusionOpNames, relationId,
     256             :                       accessMethodName, accessMethodId,
     257             :                       amcanorder, isconstraint, isWithoutOverlaps, InvalidOid,
     258             :                       0, NULL);
     259             : 
     260             :     /* Get the soon-obsolete pg_index tuple. */
     261         104 :     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
     262         104 :     if (!HeapTupleIsValid(tuple))
     263           0 :         elog(ERROR, "cache lookup failed for index %u", oldId);
     264         104 :     indexForm = (Form_pg_index) GETSTRUCT(tuple);
     265             : 
     266             :     /*
     267             :      * We don't assess expressions or predicates; assume incompatibility.
     268             :      * Also, if the index is invalid for any reason, treat it as incompatible.
     269             :      */
     270         208 :     if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
     271         104 :           heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
     272         104 :           indexForm->indisvalid))
     273             :     {
     274           0 :         ReleaseSysCache(tuple);
     275           0 :         return false;
     276             :     }
     277             : 
     278             :     /* Any change in operator class or collation breaks compatibility. */
     279         104 :     old_natts = indexForm->indnkeyatts;
     280             :     Assert(old_natts == numberOfAttributes);
     281             : 
     282         104 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indcollation);
     283         104 :     old_indcollation = (oidvector *) DatumGetPointer(d);
     284             : 
     285         104 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indclass);
     286         104 :     old_indclass = (oidvector *) DatumGetPointer(d);
     287             : 
     288         208 :     ret = (memcmp(old_indclass->values, opclassIds, old_natts * sizeof(Oid)) == 0 &&
     289         104 :            memcmp(old_indcollation->values, collationIds, old_natts * sizeof(Oid)) == 0);
     290             : 
     291         104 :     ReleaseSysCache(tuple);
     292             : 
     293         104 :     if (!ret)
     294           0 :         return false;
     295             : 
     296             :     /* For polymorphic opcintype, column type changes break compatibility. */
     297         104 :     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
     298         214 :     for (i = 0; i < old_natts; i++)
     299             :     {
     300         110 :         if (IsPolymorphicType(get_opclass_input_type(opclassIds[i])) &&
     301           0 :             TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     302             :         {
     303           0 :             ret = false;
     304           0 :             break;
     305             :         }
     306             :     }
     307             : 
     308             :     /* Any change in opclass options break compatibility. */
     309         104 :     if (ret)
     310             :     {
     311         104 :         Datum      *oldOpclassOptions = palloc_array(Datum, old_natts);
     312             : 
     313         214 :         for (i = 0; i < old_natts; i++)
     314         110 :             oldOpclassOptions[i] = get_attoptions(oldId, i + 1);
     315             : 
     316         104 :         ret = CompareOpclassOptions(oldOpclassOptions, opclassOptions, old_natts);
     317             : 
     318         104 :         pfree(oldOpclassOptions);
     319             :     }
     320             : 
     321             :     /* Any change in exclusion operator selections breaks compatibility. */
     322         104 :     if (ret && indexInfo->ii_ExclusionOps != NULL)
     323             :     {
     324             :         Oid        *old_operators,
     325             :                    *old_procs;
     326             :         uint16     *old_strats;
     327             : 
     328           0 :         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
     329           0 :         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
     330             :                      old_natts * sizeof(Oid)) == 0;
     331             : 
     332             :         /* Require an exact input type match for polymorphic operators. */
     333           0 :         if (ret)
     334             :         {
     335           0 :             for (i = 0; i < old_natts && ret; i++)
     336             :             {
     337             :                 Oid         left,
     338             :                             right;
     339             : 
     340           0 :                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
     341           0 :                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
     342           0 :                     TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     343             :                 {
     344           0 :                     ret = false;
     345           0 :                     break;
     346             :                 }
     347             :             }
     348             :         }
     349             :     }
     350             : 
     351         104 :     index_close(irel, NoLock);
     352         104 :     return ret;
     353             : }
     354             : 
     355             : /*
     356             :  * CompareOpclassOptions
     357             :  *
     358             :  * Compare per-column opclass options which are represented by arrays of text[]
     359             :  * datums.  Both elements of arrays and array themselves can be NULL.
     360             :  */
     361             : static bool
     362         104 : CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts)
     363             : {
     364             :     int         i;
     365             :     FmgrInfo    fm;
     366             : 
     367         104 :     if (!opts1 && !opts2)
     368           0 :         return true;
     369             : 
     370         104 :     fmgr_info(F_ARRAY_EQ, &fm);
     371         214 :     for (i = 0; i < natts; i++)
     372             :     {
     373         110 :         Datum       opt1 = opts1 ? opts1[i] : (Datum) 0;
     374         110 :         Datum       opt2 = opts2 ? opts2[i] : (Datum) 0;
     375             : 
     376         110 :         if (opt1 == (Datum) 0)
     377             :         {
     378         108 :             if (opt2 == (Datum) 0)
     379         108 :                 continue;
     380             :             else
     381           0 :                 return false;
     382             :         }
     383           2 :         else if (opt2 == (Datum) 0)
     384           0 :             return false;
     385             : 
     386             :         /*
     387             :          * Compare non-NULL text[] datums.  Use C collation to enforce binary
     388             :          * equivalence of texts, because we don't know anything about the
     389             :          * semantics of opclass options.
     390             :          */
     391           2 :         if (!DatumGetBool(FunctionCall2Coll(&fm, C_COLLATION_OID, opt1, opt2)))
     392           0 :             return false;
     393             :     }
     394             : 
     395         104 :     return true;
     396             : }
     397             : 
     398             : /*
     399             :  * WaitForOlderSnapshots
     400             :  *
     401             :  * Wait for transactions that might have an older snapshot than the given xmin
     402             :  * limit, because it might not contain tuples deleted just before it has
     403             :  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
     404             :  * individually. This is used when building an index concurrently.
     405             :  *
     406             :  * We can exclude any running transactions that have xmin > the xmin given;
     407             :  * their oldest snapshot must be newer than our xmin limit.
     408             :  * We can also exclude any transactions that have xmin = zero, since they
     409             :  * evidently have no live snapshot at all (and any one they might be in
     410             :  * process of taking is certainly newer than ours).  Transactions in other
     411             :  * DBs can be ignored too, since they'll never even be able to see the
     412             :  * index being worked on.
     413             :  *
     414             :  * We can also exclude autovacuum processes and processes running manual
     415             :  * lazy VACUUMs, because they won't be fazed by missing index entries
     416             :  * either.  (Manual ANALYZEs, however, can't be excluded because they
     417             :  * might be within transactions that are going to do arbitrary operations
     418             :  * later.)  Processes running CREATE INDEX CONCURRENTLY or REINDEX CONCURRENTLY
     419             :  * on indexes that are neither expressional nor partial are also safe to
     420             :  * ignore, since we know that those processes won't examine any data
     421             :  * outside the table they're indexing.
     422             :  *
     423             :  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
     424             :  * check for that.
     425             :  *
     426             :  * If a process goes idle-in-transaction with xmin zero, we do not need to
     427             :  * wait for it anymore, per the above argument.  We do not have the
     428             :  * infrastructure right now to stop waiting if that happens, but we can at
     429             :  * least avoid the folly of waiting when it is idle at the time we would
     430             :  * begin to wait.  We do this by repeatedly rechecking the output of
     431             :  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
     432             :  * doesn't show up in the output, we know we can forget about it.
     433             :  */
     434             : void
     435         706 : WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
     436             : {
     437             :     int         n_old_snapshots;
     438             :     int         i;
     439             :     VirtualTransactionId *old_snapshots;
     440             : 
     441         706 :     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
     442             :                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     443             :                                           | PROC_IN_SAFE_IC,
     444             :                                           &n_old_snapshots);
     445         706 :     if (progress)
     446         692 :         pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
     447             : 
     448        1024 :     for (i = 0; i < n_old_snapshots; i++)
     449             :     {
     450         318 :         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
     451          58 :             continue;           /* found uninteresting in previous cycle */
     452             : 
     453         260 :         if (i > 0)
     454             :         {
     455             :             /* see if anything's changed ... */
     456             :             VirtualTransactionId *newer_snapshots;
     457             :             int         n_newer_snapshots;
     458             :             int         j;
     459             :             int         k;
     460             : 
     461         118 :             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
     462             :                                                     true, false,
     463             :                                                     PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     464             :                                                     | PROC_IN_SAFE_IC,
     465             :                                                     &n_newer_snapshots);
     466         432 :             for (j = i; j < n_old_snapshots; j++)
     467             :             {
     468         314 :                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
     469          52 :                     continue;   /* found uninteresting in previous cycle */
     470         822 :                 for (k = 0; k < n_newer_snapshots; k++)
     471             :                 {
     472         712 :                     if (VirtualTransactionIdEquals(old_snapshots[j],
     473             :                                                    newer_snapshots[k]))
     474         152 :                         break;
     475             :                 }
     476         262 :                 if (k >= n_newer_snapshots) /* not there anymore */
     477         110 :                     SetInvalidVirtualTransactionId(old_snapshots[j]);
     478             :             }
     479         118 :             pfree(newer_snapshots);
     480             :         }
     481             : 
     482         260 :         if (VirtualTransactionIdIsValid(old_snapshots[i]))
     483             :         {
     484             :             /* If requested, publish who we're going to wait for. */
     485         208 :             if (progress)
     486             :             {
     487         208 :                 PGPROC     *holder = ProcNumberGetProc(old_snapshots[i].procNumber);
     488             : 
     489         208 :                 if (holder)
     490         208 :                     pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
     491         208 :                                                  holder->pid);
     492             :             }
     493         208 :             VirtualXactLock(old_snapshots[i], true);
     494             :         }
     495             : 
     496         260 :         if (progress)
     497         260 :             pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
     498             :     }
     499         706 : }
     500             : 
     501             : 
     502             : /*
     503             :  * DefineIndex
     504             :  *      Creates a new index.
     505             :  *
     506             :  * This function manages the current userid according to the needs of pg_dump.
     507             :  * Recreating old-database catalog entries in new-database is fine, regardless
     508             :  * of which users would have permission to recreate those entries now.  That's
     509             :  * just preservation of state.  Running opaque expressions, like calling a
     510             :  * function named in a catalog entry or evaluating a pg_node_tree in a catalog
     511             :  * entry, as anyone other than the object owner, is not fine.  To adhere to
     512             :  * those principles and to remain fail-safe, use the table owner userid for
     513             :  * most ACL checks.  Use the original userid for ACL checks reached without
     514             :  * traversing opaque expressions.  (pg_dump can predict such ACL checks from
     515             :  * catalogs.)  Overall, this is a mess.  Future DDL development should
     516             :  * consider offering one DDL command for catalog setup and a separate DDL
     517             :  * command for steps that run opaque expressions.
     518             :  *
     519             :  * 'pstate': ParseState struct (used only for error reports; pass NULL if
     520             :  *      not available)
     521             :  * 'tableId': the OID of the table relation on which the index is to be
     522             :  *      created
     523             :  * 'stmt': IndexStmt describing the properties of the new index.
     524             :  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
     525             :  *      nonzero to specify a preselected OID for the index.
     526             :  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
     527             :  *      of a partitioned index.
     528             :  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
     529             :  *      the child of a constraint (only used when recursing)
     530             :  * 'total_parts': total number of direct and indirect partitions of relation;
     531             :  *      pass -1 if not known or rel is not partitioned.
     532             :  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
     533             :  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
     534             :  *      should be true except when ALTER is deleting/recreating an index.)
     535             :  * 'check_not_in_use': check for table not already in use in current session.
     536             :  *      This should be true unless caller is holding the table open, in which
     537             :  *      case the caller had better have checked it earlier.
     538             :  * 'skip_build': make the catalog entries but don't create the index files
     539             :  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
     540             :  *
     541             :  * Returns the object address of the created index.
     542             :  */
     543             : ObjectAddress
     544       31682 : DefineIndex(ParseState *pstate,
     545             :             Oid tableId,
     546             :             IndexStmt *stmt,
     547             :             Oid indexRelationId,
     548             :             Oid parentIndexId,
     549             :             Oid parentConstraintId,
     550             :             int total_parts,
     551             :             bool is_alter_table,
     552             :             bool check_rights,
     553             :             bool check_not_in_use,
     554             :             bool skip_build,
     555             :             bool quiet)
     556             : {
     557             :     bool        concurrent;
     558             :     char       *indexRelationName;
     559             :     char       *accessMethodName;
     560             :     Oid        *typeIds;
     561             :     Oid        *collationIds;
     562             :     Oid        *opclassIds;
     563             :     Datum      *opclassOptions;
     564             :     Oid         accessMethodId;
     565             :     Oid         namespaceId;
     566             :     Oid         tablespaceId;
     567       31682 :     Oid         createdConstraintId = InvalidOid;
     568             :     List       *indexColNames;
     569             :     List       *allIndexParams;
     570             :     Relation    rel;
     571             :     HeapTuple   tuple;
     572             :     Form_pg_am  accessMethodForm;
     573             :     const IndexAmRoutine *amRoutine;
     574             :     bool        amcanorder;
     575             :     bool        amissummarizing;
     576             :     amoptions_function amoptions;
     577             :     bool        exclusion;
     578             :     bool        partitioned;
     579             :     bool        safe_index;
     580             :     Datum       reloptions;
     581             :     int16      *coloptions;
     582             :     IndexInfo  *indexInfo;
     583             :     bits16      flags;
     584             :     bits16      constr_flags;
     585             :     int         numberOfAttributes;
     586             :     int         numberOfKeyAttributes;
     587             :     TransactionId limitXmin;
     588             :     ObjectAddress address;
     589             :     LockRelId   heaprelid;
     590             :     LOCKTAG     heaplocktag;
     591             :     LOCKMODE    lockmode;
     592             :     Snapshot    snapshot;
     593             :     Oid         root_save_userid;
     594             :     int         root_save_sec_context;
     595             :     int         root_save_nestlevel;
     596             : 
     597       31682 :     root_save_nestlevel = NewGUCNestLevel();
     598             : 
     599       31682 :     RestrictSearchPath();
     600             : 
     601             :     /*
     602             :      * Some callers need us to run with an empty default_tablespace; this is a
     603             :      * necessary hack to be able to reproduce catalog state accurately when
     604             :      * recreating indexes after table-rewriting ALTER TABLE.
     605             :      */
     606       31682 :     if (stmt->reset_default_tblspc)
     607         456 :         (void) set_config_option("default_tablespace", "",
     608             :                                  PGC_USERSET, PGC_S_SESSION,
     609             :                                  GUC_ACTION_SAVE, true, 0, false);
     610             : 
     611             :     /*
     612             :      * Force non-concurrent build on temporary relations, even if CONCURRENTLY
     613             :      * was requested.  Other backends can't access a temporary relation, so
     614             :      * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
     615             :      * is more efficient.  Do this before any use of the concurrent option is
     616             :      * done.
     617             :      */
     618       31682 :     if (stmt->concurrent && get_rel_persistence(tableId) != RELPERSISTENCE_TEMP)
     619         194 :         concurrent = true;
     620             :     else
     621       31488 :         concurrent = false;
     622             : 
     623             :     /*
     624             :      * Start progress report.  If we're building a partition, this was already
     625             :      * done.
     626             :      */
     627       31682 :     if (!OidIsValid(parentIndexId))
     628             :     {
     629       28534 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, tableId);
     630       28534 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
     631             :                                      concurrent ?
     632             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
     633             :                                      PROGRESS_CREATEIDX_COMMAND_CREATE);
     634             :     }
     635             : 
     636             :     /*
     637             :      * No index OID to report yet
     638             :      */
     639       31682 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
     640             :                                  InvalidOid);
     641             : 
     642             :     /*
     643             :      * count key attributes in index
     644             :      */
     645       31682 :     numberOfKeyAttributes = list_length(stmt->indexParams);
     646             : 
     647             :     /*
     648             :      * Calculate the new list of index columns including both key columns and
     649             :      * INCLUDE columns.  Later we can determine which of these are key
     650             :      * columns, and which are just part of the INCLUDE list by checking the
     651             :      * list position.  A list item in a position less than ii_NumIndexKeyAttrs
     652             :      * is part of the key columns, and anything equal to and over is part of
     653             :      * the INCLUDE columns.
     654             :      */
     655       31682 :     allIndexParams = list_concat_copy(stmt->indexParams,
     656       31682 :                                       stmt->indexIncludingParams);
     657       31682 :     numberOfAttributes = list_length(allIndexParams);
     658             : 
     659       31682 :     if (numberOfKeyAttributes <= 0)
     660           0 :         ereport(ERROR,
     661             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     662             :                  errmsg("must specify at least one column")));
     663       31682 :     if (numberOfAttributes > INDEX_MAX_KEYS)
     664           0 :         ereport(ERROR,
     665             :                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
     666             :                  errmsg("cannot use more than %d columns in an index",
     667             :                         INDEX_MAX_KEYS)));
     668             : 
     669             :     /*
     670             :      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     671             :      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     672             :      * (but not VACUUM).
     673             :      *
     674             :      * NB: Caller is responsible for making sure that tableId refers to the
     675             :      * relation on which the index should be built; except in bootstrap mode,
     676             :      * this will typically require the caller to have already locked the
     677             :      * relation.  To avoid lock upgrade hazards, that lock should be at least
     678             :      * as strong as the one we take here.
     679             :      *
     680             :      * NB: If the lock strength here ever changes, code that is run by
     681             :      * parallel workers under the control of certain particular ambuild
     682             :      * functions will need to be updated, too.
     683             :      */
     684       31682 :     lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
     685       31682 :     rel = table_open(tableId, lockmode);
     686             : 
     687             :     /*
     688             :      * Switch to the table owner's userid, so that any index functions are run
     689             :      * as that user.  Also lock down security-restricted operations.  We
     690             :      * already arranged to make GUC variable changes local to this command.
     691             :      */
     692       31682 :     GetUserIdAndSecContext(&root_save_userid, &root_save_sec_context);
     693       31682 :     SetUserIdAndSecContext(rel->rd_rel->relowner,
     694             :                            root_save_sec_context | SECURITY_RESTRICTED_OPERATION);
     695             : 
     696       31682 :     namespaceId = RelationGetNamespace(rel);
     697             : 
     698             :     /*
     699             :      * It has exclusion constraint behavior if it's an EXCLUDE constraint or a
     700             :      * temporal PRIMARY KEY/UNIQUE constraint
     701             :      */
     702       31682 :     exclusion = stmt->excludeOpNames || stmt->iswithoutoverlaps;
     703             : 
     704             :     /* Ensure that it makes sense to index this kind of relation */
     705       31682 :     switch (rel->rd_rel->relkind)
     706             :     {
     707       31676 :         case RELKIND_RELATION:
     708             :         case RELKIND_MATVIEW:
     709             :         case RELKIND_PARTITIONED_TABLE:
     710             :             /* OK */
     711       31676 :             break;
     712           6 :         default:
     713           6 :             ereport(ERROR,
     714             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     715             :                      errmsg("cannot create index on relation \"%s\"",
     716             :                             RelationGetRelationName(rel)),
     717             :                      errdetail_relkind_not_supported(rel->rd_rel->relkind)));
     718             :             break;
     719             :     }
     720             : 
     721             :     /*
     722             :      * Establish behavior for partitioned tables, and verify sanity of
     723             :      * parameters.
     724             :      *
     725             :      * We do not build an actual index in this case; we only create a few
     726             :      * catalog entries.  The actual indexes are built by recursing for each
     727             :      * partition.
     728             :      */
     729       31676 :     partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
     730       31676 :     if (partitioned)
     731             :     {
     732             :         /*
     733             :          * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
     734             :          * the error is thrown also for temporary tables.  Seems better to be
     735             :          * consistent, even though we could do it on temporary table because
     736             :          * we're not actually doing it concurrently.
     737             :          */
     738        2254 :         if (stmt->concurrent)
     739           6 :             ereport(ERROR,
     740             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     741             :                      errmsg("cannot create index on partitioned table \"%s\" concurrently",
     742             :                             RelationGetRelationName(rel))));
     743             :     }
     744             : 
     745             :     /*
     746             :      * Don't try to CREATE INDEX on temp tables of other backends.
     747             :      */
     748       31670 :     if (RELATION_IS_OTHER_TEMP(rel))
     749           0 :         ereport(ERROR,
     750             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     751             :                  errmsg("cannot create indexes on temporary tables of other sessions")));
     752             : 
     753             :     /*
     754             :      * Unless our caller vouches for having checked this already, insist that
     755             :      * the table not be in use by our own session, either.  Otherwise we might
     756             :      * fail to make entries in the new index (for instance, if an INSERT or
     757             :      * UPDATE is in progress and has already made its list of target indexes).
     758             :      */
     759       31670 :     if (check_not_in_use)
     760       15040 :         CheckTableNotInUse(rel, "CREATE INDEX");
     761             : 
     762             :     /*
     763             :      * Verify we (still) have CREATE rights in the rel's namespace.
     764             :      * (Presumably we did when the rel was created, but maybe not anymore.)
     765             :      * Skip check if caller doesn't want it.  Also skip check if
     766             :      * bootstrapping, since permissions machinery may not be working yet.
     767             :      */
     768       31664 :     if (check_rights && !IsBootstrapProcessingMode())
     769             :     {
     770             :         AclResult   aclresult;
     771             : 
     772       16278 :         aclresult = object_aclcheck(NamespaceRelationId, namespaceId, root_save_userid,
     773             :                                     ACL_CREATE);
     774       16278 :         if (aclresult != ACLCHECK_OK)
     775           0 :             aclcheck_error(aclresult, OBJECT_SCHEMA,
     776           0 :                            get_namespace_name(namespaceId));
     777             :     }
     778             : 
     779             :     /*
     780             :      * Select tablespace to use.  If not specified, use default tablespace
     781             :      * (which may in turn default to database's default).
     782             :      */
     783       31664 :     if (stmt->tableSpace)
     784             :     {
     785         248 :         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
     786         248 :         if (partitioned && tablespaceId == MyDatabaseTableSpace)
     787           6 :             ereport(ERROR,
     788             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     789             :                      errmsg("cannot specify default tablespace for partitioned relations")));
     790             :     }
     791             :     else
     792             :     {
     793       31416 :         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
     794             :                                             partitioned);
     795             :         /* note InvalidOid is OK in this case */
     796             :     }
     797             : 
     798             :     /* Check tablespace permissions */
     799       31652 :     if (check_rights &&
     800         116 :         OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
     801             :     {
     802             :         AclResult   aclresult;
     803             : 
     804         116 :         aclresult = object_aclcheck(TableSpaceRelationId, tablespaceId, root_save_userid,
     805             :                                     ACL_CREATE);
     806         116 :         if (aclresult != ACLCHECK_OK)
     807           0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     808           0 :                            get_tablespace_name(tablespaceId));
     809             :     }
     810             : 
     811             :     /*
     812             :      * Force shared indexes into the pg_global tablespace.  This is a bit of a
     813             :      * hack but seems simpler than marking them in the BKI commands.  On the
     814             :      * other hand, if it's not shared, don't allow it to be placed there.
     815             :      */
     816       31652 :     if (rel->rd_rel->relisshared)
     817        2142 :         tablespaceId = GLOBALTABLESPACE_OID;
     818       29510 :     else if (tablespaceId == GLOBALTABLESPACE_OID)
     819           0 :         ereport(ERROR,
     820             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     821             :                  errmsg("only shared relations can be placed in pg_global tablespace")));
     822             : 
     823             :     /*
     824             :      * Choose the index column names.
     825             :      */
     826       31652 :     indexColNames = ChooseIndexColumnNames(allIndexParams);
     827             : 
     828             :     /*
     829             :      * Select name for index if caller didn't specify
     830             :      */
     831       31652 :     indexRelationName = stmt->idxname;
     832       31652 :     if (indexRelationName == NULL)
     833       12316 :         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
     834             :                                             namespaceId,
     835             :                                             indexColNames,
     836       12316 :                                             stmt->excludeOpNames,
     837       12316 :                                             stmt->primary,
     838       12316 :                                             stmt->isconstraint);
     839             : 
     840             :     /*
     841             :      * look up the access method, verify it can handle the requested features
     842             :      */
     843       31652 :     accessMethodName = stmt->accessMethod;
     844       31652 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     845       31652 :     if (!HeapTupleIsValid(tuple))
     846             :     {
     847             :         /*
     848             :          * Hack to provide more-or-less-transparent updating of old RTREE
     849             :          * indexes to GiST: if RTREE is requested and not found, use GIST.
     850             :          */
     851           6 :         if (strcmp(accessMethodName, "rtree") == 0)
     852             :         {
     853           6 :             ereport(NOTICE,
     854             :                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
     855           6 :             accessMethodName = "gist";
     856           6 :             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     857             :         }
     858             : 
     859           6 :         if (!HeapTupleIsValid(tuple))
     860           0 :             ereport(ERROR,
     861             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     862             :                      errmsg("access method \"%s\" does not exist",
     863             :                             accessMethodName)));
     864             :     }
     865       31652 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     866       31652 :     accessMethodId = accessMethodForm->oid;
     867       31652 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     868             : 
     869       31652 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
     870             :                                  accessMethodId);
     871             : 
     872       31652 :     if (stmt->unique && !stmt->iswithoutoverlaps && !amRoutine->amcanunique)
     873           0 :         ereport(ERROR,
     874             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     875             :                  errmsg("access method \"%s\" does not support unique indexes",
     876             :                         accessMethodName)));
     877       31652 :     if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
     878          18 :         ereport(ERROR,
     879             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     880             :                  errmsg("access method \"%s\" does not support included columns",
     881             :                         accessMethodName)));
     882       31634 :     if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
     883           0 :         ereport(ERROR,
     884             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     885             :                  errmsg("access method \"%s\" does not support multicolumn indexes",
     886             :                         accessMethodName)));
     887       31634 :     if (exclusion && amRoutine->amgettuple == NULL)
     888           0 :         ereport(ERROR,
     889             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     890             :                  errmsg("access method \"%s\" does not support exclusion constraints",
     891             :                         accessMethodName)));
     892       31634 :     if (stmt->iswithoutoverlaps && strcmp(accessMethodName, "gist") != 0)
     893           0 :         ereport(ERROR,
     894             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     895             :                  errmsg("access method \"%s\" does not support WITHOUT OVERLAPS constraints",
     896             :                         accessMethodName)));
     897             : 
     898       31634 :     amcanorder = amRoutine->amcanorder;
     899       31634 :     amoptions = amRoutine->amoptions;
     900       31634 :     amissummarizing = amRoutine->amsummarizing;
     901             : 
     902       31634 :     ReleaseSysCache(tuple);
     903             : 
     904             :     /*
     905             :      * Validate predicate, if given
     906             :      */
     907       31634 :     if (stmt->whereClause)
     908         448 :         CheckPredicate((Expr *) stmt->whereClause);
     909             : 
     910             :     /*
     911             :      * Parse AM-specific options, convert to text array form, validate.
     912             :      */
     913       31634 :     reloptions = transformRelOptions((Datum) 0, stmt->options,
     914             :                                      NULL, NULL, false, false);
     915             : 
     916       31628 :     (void) index_reloptions(amoptions, reloptions, true);
     917             : 
     918             :     /*
     919             :      * Prepare arguments for index_create, primarily an IndexInfo structure.
     920             :      * Note that predicates must be in implicit-AND format.  In a concurrent
     921             :      * build, mark it not-ready-for-inserts.
     922             :      */
     923       31564 :     indexInfo = makeIndexInfo(numberOfAttributes,
     924             :                               numberOfKeyAttributes,
     925             :                               accessMethodId,
     926             :                               NIL,  /* expressions, NIL for now */
     927       31564 :                               make_ands_implicit((Expr *) stmt->whereClause),
     928       31564 :                               stmt->unique,
     929       31564 :                               stmt->nulls_not_distinct,
     930       31564 :                               !concurrent,
     931             :                               concurrent,
     932             :                               amissummarizing,
     933       31564 :                               stmt->iswithoutoverlaps);
     934             : 
     935       31564 :     typeIds = palloc_array(Oid, numberOfAttributes);
     936       31564 :     collationIds = palloc_array(Oid, numberOfAttributes);
     937       31564 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     938       31564 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     939       31564 :     coloptions = palloc_array(int16, numberOfAttributes);
     940       31564 :     ComputeIndexAttrs(pstate,
     941             :                       indexInfo,
     942             :                       typeIds, collationIds, opclassIds, opclassOptions,
     943             :                       coloptions, allIndexParams,
     944       31564 :                       stmt->excludeOpNames, tableId,
     945             :                       accessMethodName, accessMethodId,
     946       31564 :                       amcanorder, stmt->isconstraint, stmt->iswithoutoverlaps,
     947             :                       root_save_userid, root_save_sec_context,
     948             :                       &root_save_nestlevel);
     949             : 
     950             :     /*
     951             :      * Extra checks when creating a PRIMARY KEY index.
     952             :      */
     953       31346 :     if (stmt->primary)
     954        9354 :         index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
     955             : 
     956             :     /*
     957             :      * If this table is partitioned and we're creating a unique index, primary
     958             :      * key, or exclusion constraint, make sure that the partition key is a
     959             :      * subset of the index's columns.  Otherwise it would be possible to
     960             :      * violate uniqueness by putting values that ought to be unique in
     961             :      * different partitions.
     962             :      *
     963             :      * We could lift this limitation if we had global indexes, but those have
     964             :      * their own problems, so this is a useful feature combination.
     965             :      */
     966       31310 :     if (partitioned && (stmt->unique || exclusion))
     967             :     {
     968        1328 :         PartitionKey key = RelationGetPartitionKey(rel);
     969             :         const char *constraint_type;
     970             :         int         i;
     971             : 
     972        1328 :         if (stmt->primary)
     973         982 :             constraint_type = "PRIMARY KEY";
     974         346 :         else if (stmt->unique)
     975         246 :             constraint_type = "UNIQUE";
     976         100 :         else if (stmt->excludeOpNames)
     977         100 :             constraint_type = "EXCLUDE";
     978             :         else
     979             :         {
     980           0 :             elog(ERROR, "unknown constraint type");
     981             :             constraint_type = NULL; /* keep compiler quiet */
     982             :         }
     983             : 
     984             :         /*
     985             :          * Verify that all the columns in the partition key appear in the
     986             :          * unique key definition, with the same notion of equality.
     987             :          */
     988        2672 :         for (i = 0; i < key->partnatts; i++)
     989             :         {
     990        1448 :             bool        found = false;
     991             :             int         eq_strategy;
     992             :             Oid         ptkey_eqop;
     993             :             int         j;
     994             : 
     995             :             /*
     996             :              * Identify the equality operator associated with this partkey
     997             :              * column.  For list and range partitioning, partkeys use btree
     998             :              * operator classes; hash partitioning uses hash operator classes.
     999             :              * (Keep this in sync with ComputePartitionAttrs!)
    1000             :              */
    1001        1448 :             if (key->strategy == PARTITION_STRATEGY_HASH)
    1002          66 :                 eq_strategy = HTEqualStrategyNumber;
    1003             :             else
    1004        1382 :                 eq_strategy = BTEqualStrategyNumber;
    1005             : 
    1006        1448 :             ptkey_eqop = get_opfamily_member(key->partopfamily[i],
    1007        1448 :                                              key->partopcintype[i],
    1008        1448 :                                              key->partopcintype[i],
    1009             :                                              eq_strategy);
    1010        1448 :             if (!OidIsValid(ptkey_eqop))
    1011           0 :                 elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
    1012             :                      eq_strategy, key->partopcintype[i], key->partopcintype[i],
    1013             :                      key->partopfamily[i]);
    1014             : 
    1015             :             /*
    1016             :              * It may be possible to support UNIQUE constraints when partition
    1017             :              * keys are expressions, but is it worth it?  Give up for now.
    1018             :              */
    1019        1448 :             if (key->partattrs[i] == 0)
    1020          12 :                 ereport(ERROR,
    1021             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1022             :                          errmsg("unsupported %s constraint with partition key definition",
    1023             :                                 constraint_type),
    1024             :                          errdetail("%s constraints cannot be used when partition keys include expressions.",
    1025             :                                    constraint_type)));
    1026             : 
    1027             :             /* Search the index column(s) for a match */
    1028        1636 :             for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
    1029             :             {
    1030        1558 :                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
    1031             :                 {
    1032             :                     /*
    1033             :                      * Matched the column, now what about the collation and
    1034             :                      * equality op?
    1035             :                      */
    1036             :                     Oid         idx_opfamily;
    1037             :                     Oid         idx_opcintype;
    1038             : 
    1039        1358 :                     if (key->partcollation[i] != collationIds[j])
    1040           0 :                         continue;
    1041             : 
    1042        1358 :                     if (get_opclass_opfamily_and_input_type(opclassIds[j],
    1043             :                                                             &idx_opfamily,
    1044             :                                                             &idx_opcintype))
    1045             :                     {
    1046        1358 :                         Oid         idx_eqop = InvalidOid;
    1047             : 
    1048        1358 :                         if (stmt->unique && !stmt->iswithoutoverlaps)
    1049        1210 :                             idx_eqop = get_opfamily_member_for_cmptype(idx_opfamily,
    1050             :                                                                        idx_opcintype,
    1051             :                                                                        idx_opcintype,
    1052             :                                                                        COMPARE_EQ);
    1053         148 :                         else if (exclusion)
    1054         148 :                             idx_eqop = indexInfo->ii_ExclusionOps[j];
    1055             : 
    1056        1358 :                         if (!idx_eqop)
    1057           0 :                             ereport(ERROR,
    1058             :                                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1059             :                                     errmsg("could not identify an equality operator for type %s", format_type_be(idx_opcintype)),
    1060             :                                     errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    1061             :                                               get_opfamily_name(idx_opfamily, false), get_am_name(get_opfamily_method(idx_opfamily))));
    1062             : 
    1063        1358 :                         if (ptkey_eqop == idx_eqop)
    1064             :                         {
    1065        1344 :                             found = true;
    1066        1344 :                             break;
    1067             :                         }
    1068          14 :                         else if (exclusion)
    1069             :                         {
    1070             :                             /*
    1071             :                              * We found a match, but it's not an equality
    1072             :                              * operator. Instead of failing below with an
    1073             :                              * error message about a missing column, fail now
    1074             :                              * and explain that the operator is wrong.
    1075             :                              */
    1076          14 :                             Form_pg_attribute att = TupleDescAttr(RelationGetDescr(rel), key->partattrs[i] - 1);
    1077             : 
    1078          14 :                             ereport(ERROR,
    1079             :                                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1080             :                                      errmsg("cannot match partition key to index on column \"%s\" using non-equal operator \"%s\"",
    1081             :                                             NameStr(att->attname),
    1082             :                                             get_opname(indexInfo->ii_ExclusionOps[j]))));
    1083             :                         }
    1084             :                     }
    1085             :                 }
    1086             :             }
    1087             : 
    1088        1422 :             if (!found)
    1089             :             {
    1090             :                 Form_pg_attribute att;
    1091             : 
    1092          78 :                 att = TupleDescAttr(RelationGetDescr(rel),
    1093          78 :                                     key->partattrs[i] - 1);
    1094          78 :                 ereport(ERROR,
    1095             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1096             :                 /* translator: %s is UNIQUE, PRIMARY KEY, etc */
    1097             :                          errmsg("%s constraint on partitioned table must include all partitioning columns",
    1098             :                                 constraint_type),
    1099             :                 /* translator: first %s is UNIQUE, PRIMARY KEY, etc */
    1100             :                          errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
    1101             :                                    constraint_type, RelationGetRelationName(rel),
    1102             :                                    NameStr(att->attname))));
    1103             :             }
    1104             :         }
    1105             :     }
    1106             : 
    1107             : 
    1108             :     /*
    1109             :      * We disallow indexes on system columns.  They would not necessarily get
    1110             :      * updated correctly, and they don't seem useful anyway.
    1111             :      *
    1112             :      * Also disallow virtual generated columns in indexes (use expression
    1113             :      * index instead).
    1114             :      */
    1115       75080 :     for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
    1116             :     {
    1117       43898 :         AttrNumber  attno = indexInfo->ii_IndexAttrNumbers[i];
    1118             : 
    1119       43898 :         if (attno < 0)
    1120           6 :             ereport(ERROR,
    1121             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1122             :                      errmsg("index creation on system columns is not supported")));
    1123             : 
    1124             : 
    1125       43892 :         if (TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1126          18 :             ereport(ERROR,
    1127             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1128             :                     stmt->primary ?
    1129             :                     errmsg("primary keys on virtual generated columns are not supported") :
    1130             :                     stmt->isconstraint ?
    1131             :                     errmsg("unique constraints on virtual generated columns are not supported") :
    1132             :                     errmsg("indexes on virtual generated columns are not supported"));
    1133             :     }
    1134             : 
    1135             :     /*
    1136             :      * Also check for system and generated columns used in expressions or
    1137             :      * predicates.
    1138             :      */
    1139       31182 :     if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
    1140             :     {
    1141        1276 :         Bitmapset  *indexattrs = NULL;
    1142             :         int         j;
    1143             : 
    1144        1276 :         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
    1145        1276 :         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
    1146             : 
    1147        8920 :         for (int i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
    1148             :         {
    1149        7656 :             if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
    1150             :                               indexattrs))
    1151          12 :                 ereport(ERROR,
    1152             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1153             :                          errmsg("index creation on system columns is not supported")));
    1154             :         }
    1155             : 
    1156             :         /*
    1157             :          * XXX Virtual generated columns in index expressions or predicates
    1158             :          * could be supported, but it needs support in
    1159             :          * RelationGetIndexExpressions() and RelationGetIndexPredicate().
    1160             :          */
    1161        1264 :         j = -1;
    1162        2742 :         while ((j = bms_next_member(indexattrs, j)) >= 0)
    1163             :         {
    1164        1478 :             AttrNumber  attno = j + FirstLowInvalidHeapAttributeNumber;
    1165             : 
    1166        1478 :             if (TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1167           0 :                 ereport(ERROR,
    1168             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1169             :                          stmt->isconstraint ?
    1170             :                          errmsg("unique constraints on virtual generated columns are not supported") :
    1171             :                          errmsg("indexes on virtual generated columns are not supported")));
    1172             :         }
    1173             :     }
    1174             : 
    1175             :     /* Is index safe for others to ignore?  See set_indexsafe_procflags() */
    1176       61414 :     safe_index = indexInfo->ii_Expressions == NIL &&
    1177       30244 :         indexInfo->ii_Predicate == NIL;
    1178             : 
    1179             :     /*
    1180             :      * Report index creation if appropriate (delay this till after most of the
    1181             :      * error checks)
    1182             :      */
    1183       31170 :     if (stmt->isconstraint && !quiet)
    1184             :     {
    1185             :         const char *constraint_type;
    1186             : 
    1187       10276 :         if (stmt->primary)
    1188        9150 :             constraint_type = "PRIMARY KEY";
    1189        1126 :         else if (stmt->unique)
    1190         950 :             constraint_type = "UNIQUE";
    1191         176 :         else if (stmt->excludeOpNames)
    1192         176 :             constraint_type = "EXCLUDE";
    1193             :         else
    1194             :         {
    1195           0 :             elog(ERROR, "unknown constraint type");
    1196             :             constraint_type = NULL; /* keep compiler quiet */
    1197             :         }
    1198             : 
    1199       10276 :         ereport(DEBUG1,
    1200             :                 (errmsg_internal("%s %s will create implicit index \"%s\" for table \"%s\"",
    1201             :                                  is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
    1202             :                                  constraint_type,
    1203             :                                  indexRelationName, RelationGetRelationName(rel))));
    1204             :     }
    1205             : 
    1206             :     /*
    1207             :      * A valid stmt->oldNumber implies that we already have a built form of
    1208             :      * the index.  The caller should also decline any index build.
    1209             :      */
    1210             :     Assert(!RelFileNumberIsValid(stmt->oldNumber) || (skip_build && !concurrent));
    1211             : 
    1212             :     /*
    1213             :      * Make the catalog entries for the index, including constraints. This
    1214             :      * step also actually builds the index, except if caller requested not to
    1215             :      * or in concurrent mode, in which case it'll be done later, or doing a
    1216             :      * partitioned index (because those don't have storage).
    1217             :      */
    1218       31170 :     flags = constr_flags = 0;
    1219       31170 :     if (stmt->isconstraint)
    1220       10516 :         flags |= INDEX_CREATE_ADD_CONSTRAINT;
    1221       31170 :     if (skip_build || concurrent || partitioned)
    1222       15406 :         flags |= INDEX_CREATE_SKIP_BUILD;
    1223       31170 :     if (stmt->if_not_exists)
    1224          18 :         flags |= INDEX_CREATE_IF_NOT_EXISTS;
    1225       31170 :     if (concurrent)
    1226         188 :         flags |= INDEX_CREATE_CONCURRENT;
    1227       31170 :     if (partitioned)
    1228        2132 :         flags |= INDEX_CREATE_PARTITIONED;
    1229       31170 :     if (stmt->primary)
    1230        9276 :         flags |= INDEX_CREATE_IS_PRIMARY;
    1231             : 
    1232             :     /*
    1233             :      * If the table is partitioned, and recursion was declined but partitions
    1234             :      * exist, mark the index as invalid.
    1235             :      */
    1236       31170 :     if (partitioned && stmt->relation && !stmt->relation->inh)
    1237             :     {
    1238         250 :         PartitionDesc pd = RelationGetPartitionDesc(rel, true);
    1239             : 
    1240         250 :         if (pd->nparts != 0)
    1241         230 :             flags |= INDEX_CREATE_INVALID;
    1242             :     }
    1243             : 
    1244       31170 :     if (stmt->deferrable)
    1245         132 :         constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
    1246       31170 :     if (stmt->initdeferred)
    1247          38 :         constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
    1248       31170 :     if (stmt->iswithoutoverlaps)
    1249         632 :         constr_flags |= INDEX_CONSTR_CREATE_WITHOUT_OVERLAPS;
    1250             : 
    1251             :     indexRelationId =
    1252       31170 :         index_create(rel, indexRelationName, indexRelationId, parentIndexId,
    1253             :                      parentConstraintId,
    1254             :                      stmt->oldNumber, indexInfo, indexColNames,
    1255             :                      accessMethodId, tablespaceId,
    1256             :                      collationIds, opclassIds, opclassOptions,
    1257             :                      coloptions, NULL, reloptions,
    1258             :                      flags, constr_flags,
    1259       31170 :                      allowSystemTableMods, !check_rights,
    1260       31170 :                      &createdConstraintId);
    1261             : 
    1262       30938 :     ObjectAddressSet(address, RelationRelationId, indexRelationId);
    1263             : 
    1264       30938 :     if (!OidIsValid(indexRelationId))
    1265             :     {
    1266             :         /*
    1267             :          * Roll back any GUC changes executed by index functions.  Also revert
    1268             :          * to original default_tablespace if we changed it above.
    1269             :          */
    1270          18 :         AtEOXact_GUC(false, root_save_nestlevel);
    1271             : 
    1272             :         /* Restore userid and security context */
    1273          18 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1274             : 
    1275          18 :         table_close(rel, NoLock);
    1276             : 
    1277             :         /* If this is the top-level index, we're done */
    1278          18 :         if (!OidIsValid(parentIndexId))
    1279          18 :             pgstat_progress_end_command();
    1280             : 
    1281          18 :         return address;
    1282             :     }
    1283             : 
    1284             :     /*
    1285             :      * Roll back any GUC changes executed by index functions, and keep
    1286             :      * subsequent changes local to this command.  This is essential if some
    1287             :      * index function changed a behavior-affecting GUC, e.g. search_path.
    1288             :      */
    1289       30920 :     AtEOXact_GUC(false, root_save_nestlevel);
    1290       30920 :     root_save_nestlevel = NewGUCNestLevel();
    1291       30920 :     RestrictSearchPath();
    1292             : 
    1293             :     /* Add any requested comment */
    1294       30920 :     if (stmt->idxcomment != NULL)
    1295          78 :         CreateComments(indexRelationId, RelationRelationId, 0,
    1296          78 :                        stmt->idxcomment);
    1297             : 
    1298       30920 :     if (partitioned)
    1299             :     {
    1300             :         PartitionDesc partdesc;
    1301             : 
    1302             :         /*
    1303             :          * Unless caller specified to skip this step (via ONLY), process each
    1304             :          * partition to make sure they all contain a corresponding index.
    1305             :          *
    1306             :          * If we're called internally (no stmt->relation), recurse always.
    1307             :          */
    1308        2132 :         partdesc = RelationGetPartitionDesc(rel, true);
    1309        2132 :         if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
    1310             :         {
    1311         624 :             int         nparts = partdesc->nparts;
    1312         624 :             Oid        *part_oids = palloc_array(Oid, nparts);
    1313         624 :             bool        invalidate_parent = false;
    1314             :             Relation    parentIndex;
    1315             :             TupleDesc   parentDesc;
    1316             : 
    1317             :             /*
    1318             :              * Report the total number of partitions at the start of the
    1319             :              * command; don't update it when being called recursively.
    1320             :              */
    1321         624 :             if (!OidIsValid(parentIndexId))
    1322             :             {
    1323             :                 /*
    1324             :                  * When called by ProcessUtilitySlow, the number of partitions
    1325             :                  * is passed in as an optimization; but other callers pass -1
    1326             :                  * since they don't have the value handy.  This should count
    1327             :                  * partitions the same way, ie one less than the number of
    1328             :                  * relations find_all_inheritors reports.
    1329             :                  *
    1330             :                  * We assume we needn't ask find_all_inheritors to take locks,
    1331             :                  * because that should have happened already for all callers.
    1332             :                  * Even if it did not, this is safe as long as we don't try to
    1333             :                  * touch the partitions here; the worst consequence would be a
    1334             :                  * bogus progress-reporting total.
    1335             :                  */
    1336         512 :                 if (total_parts < 0)
    1337             :                 {
    1338         128 :                     List       *children = find_all_inheritors(tableId, NoLock, NULL);
    1339             : 
    1340         128 :                     total_parts = list_length(children) - 1;
    1341         128 :                     list_free(children);
    1342             :                 }
    1343             : 
    1344         512 :                 pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
    1345             :                                              total_parts);
    1346             :             }
    1347             : 
    1348             :             /* Make a local copy of partdesc->oids[], just for safety */
    1349         624 :             memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
    1350             : 
    1351             :             /*
    1352             :              * We'll need an IndexInfo describing the parent index.  The one
    1353             :              * built above is almost good enough, but not quite, because (for
    1354             :              * example) its predicate expression if any hasn't been through
    1355             :              * expression preprocessing.  The most reliable way to get an
    1356             :              * IndexInfo that will match those for child indexes is to build
    1357             :              * it the same way, using BuildIndexInfo().
    1358             :              */
    1359         624 :             parentIndex = index_open(indexRelationId, lockmode);
    1360         624 :             indexInfo = BuildIndexInfo(parentIndex);
    1361             : 
    1362         624 :             parentDesc = RelationGetDescr(rel);
    1363             : 
    1364             :             /*
    1365             :              * For each partition, scan all existing indexes; if one matches
    1366             :              * our index definition and is not already attached to some other
    1367             :              * parent index, attach it to the one we just created.
    1368             :              *
    1369             :              * If none matches, build a new index by calling ourselves
    1370             :              * recursively with the same options (except for the index name).
    1371             :              */
    1372        1706 :             for (int i = 0; i < nparts; i++)
    1373             :             {
    1374        1106 :                 Oid         childRelid = part_oids[i];
    1375             :                 Relation    childrel;
    1376             :                 Oid         child_save_userid;
    1377             :                 int         child_save_sec_context;
    1378             :                 int         child_save_nestlevel;
    1379             :                 List       *childidxs;
    1380             :                 ListCell   *cell;
    1381             :                 AttrMap    *attmap;
    1382        1106 :                 bool        found = false;
    1383             : 
    1384        1106 :                 childrel = table_open(childRelid, lockmode);
    1385             : 
    1386        1106 :                 GetUserIdAndSecContext(&child_save_userid,
    1387             :                                        &child_save_sec_context);
    1388        1106 :                 SetUserIdAndSecContext(childrel->rd_rel->relowner,
    1389             :                                        child_save_sec_context | SECURITY_RESTRICTED_OPERATION);
    1390        1106 :                 child_save_nestlevel = NewGUCNestLevel();
    1391        1106 :                 RestrictSearchPath();
    1392             : 
    1393             :                 /*
    1394             :                  * Don't try to create indexes on foreign tables, though. Skip
    1395             :                  * those if a regular index, or fail if trying to create a
    1396             :                  * constraint index.
    1397             :                  */
    1398        1106 :                 if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
    1399             :                 {
    1400          18 :                     if (stmt->unique || stmt->primary)
    1401          12 :                         ereport(ERROR,
    1402             :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1403             :                                  errmsg("cannot create unique index on partitioned table \"%s\"",
    1404             :                                         RelationGetRelationName(rel)),
    1405             :                                  errdetail("Table \"%s\" contains partitions that are foreign tables.",
    1406             :                                            RelationGetRelationName(rel))));
    1407             : 
    1408           6 :                     AtEOXact_GUC(false, child_save_nestlevel);
    1409           6 :                     SetUserIdAndSecContext(child_save_userid,
    1410             :                                            child_save_sec_context);
    1411           6 :                     table_close(childrel, lockmode);
    1412           6 :                     continue;
    1413             :                 }
    1414             : 
    1415        1088 :                 childidxs = RelationGetIndexList(childrel);
    1416             :                 attmap =
    1417        1088 :                     build_attrmap_by_name(RelationGetDescr(childrel),
    1418             :                                           parentDesc,
    1419             :                                           false);
    1420             : 
    1421        1436 :                 foreach(cell, childidxs)
    1422             :                 {
    1423         426 :                     Oid         cldidxid = lfirst_oid(cell);
    1424             :                     Relation    cldidx;
    1425             :                     IndexInfo  *cldIdxInfo;
    1426             : 
    1427             :                     /* this index is already partition of another one */
    1428         426 :                     if (has_superclass(cldidxid))
    1429         324 :                         continue;
    1430             : 
    1431         102 :                     cldidx = index_open(cldidxid, lockmode);
    1432         102 :                     cldIdxInfo = BuildIndexInfo(cldidx);
    1433         102 :                     if (CompareIndexInfo(cldIdxInfo, indexInfo,
    1434         102 :                                          cldidx->rd_indcollation,
    1435         102 :                                          parentIndex->rd_indcollation,
    1436         102 :                                          cldidx->rd_opfamily,
    1437         102 :                                          parentIndex->rd_opfamily,
    1438             :                                          attmap))
    1439             :                     {
    1440          78 :                         Oid         cldConstrOid = InvalidOid;
    1441             : 
    1442             :                         /*
    1443             :                          * Found a match.
    1444             :                          *
    1445             :                          * If this index is being created in the parent
    1446             :                          * because of a constraint, then the child needs to
    1447             :                          * have a constraint also, so look for one.  If there
    1448             :                          * is no such constraint, this index is no good, so
    1449             :                          * keep looking.
    1450             :                          */
    1451          78 :                         if (createdConstraintId != InvalidOid)
    1452             :                         {
    1453             :                             cldConstrOid =
    1454          12 :                                 get_relation_idx_constraint_oid(childRelid,
    1455             :                                                                 cldidxid);
    1456          12 :                             if (cldConstrOid == InvalidOid)
    1457             :                             {
    1458           0 :                                 index_close(cldidx, lockmode);
    1459           0 :                                 continue;
    1460             :                             }
    1461             :                         }
    1462             : 
    1463             :                         /* Attach index to parent and we're done. */
    1464          78 :                         IndexSetParentIndex(cldidx, indexRelationId);
    1465          78 :                         if (createdConstraintId != InvalidOid)
    1466          12 :                             ConstraintSetParentConstraint(cldConstrOid,
    1467             :                                                           createdConstraintId,
    1468             :                                                           childRelid);
    1469             : 
    1470          78 :                         if (!cldidx->rd_index->indisvalid)
    1471          18 :                             invalidate_parent = true;
    1472             : 
    1473          78 :                         found = true;
    1474             : 
    1475             :                         /*
    1476             :                          * Report this partition as processed.  Note that if
    1477             :                          * the partition has children itself, we'd ideally
    1478             :                          * count the children and update the progress report
    1479             :                          * for all of them; but that seems unduly expensive.
    1480             :                          * Instead, the progress report will act like all such
    1481             :                          * indirect children were processed in zero time at
    1482             :                          * the end of the command.
    1483             :                          */
    1484          78 :                         pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1485             : 
    1486             :                         /* keep lock till commit */
    1487          78 :                         index_close(cldidx, NoLock);
    1488          78 :                         break;
    1489             :                     }
    1490             : 
    1491          24 :                     index_close(cldidx, lockmode);
    1492             :                 }
    1493             : 
    1494        1088 :                 list_free(childidxs);
    1495        1088 :                 AtEOXact_GUC(false, child_save_nestlevel);
    1496        1088 :                 SetUserIdAndSecContext(child_save_userid,
    1497             :                                        child_save_sec_context);
    1498        1088 :                 table_close(childrel, NoLock);
    1499             : 
    1500             :                 /*
    1501             :                  * If no matching index was found, create our own.
    1502             :                  */
    1503        1088 :                 if (!found)
    1504             :                 {
    1505             :                     IndexStmt  *childStmt;
    1506             :                     ObjectAddress childAddr;
    1507             : 
    1508             :                     /*
    1509             :                      * Build an IndexStmt describing the desired child index
    1510             :                      * in the same way that we do during ATTACH PARTITION.
    1511             :                      * Notably, we rely on generateClonedIndexStmt to produce
    1512             :                      * a search-path-independent representation, which the
    1513             :                      * original IndexStmt might not be.
    1514             :                      */
    1515        1010 :                     childStmt = generateClonedIndexStmt(NULL,
    1516             :                                                         parentIndex,
    1517             :                                                         attmap,
    1518             :                                                         NULL);
    1519             : 
    1520             :                     /*
    1521             :                      * Recurse as the starting user ID.  Callee will use that
    1522             :                      * for permission checks, then switch again.
    1523             :                      */
    1524             :                     Assert(GetUserId() == child_save_userid);
    1525        1010 :                     SetUserIdAndSecContext(root_save_userid,
    1526             :                                            root_save_sec_context);
    1527             :                     childAddr =
    1528        1010 :                         DefineIndex(NULL,   /* original pstate not applicable */
    1529             :                                     childRelid, childStmt,
    1530             :                                     InvalidOid, /* no predefined OID */
    1531             :                                     indexRelationId,    /* this is our child */
    1532             :                                     createdConstraintId,
    1533             :                                     -1,
    1534             :                                     is_alter_table, check_rights,
    1535             :                                     check_not_in_use,
    1536             :                                     skip_build, quiet);
    1537         998 :                     SetUserIdAndSecContext(child_save_userid,
    1538             :                                            child_save_sec_context);
    1539             : 
    1540             :                     /*
    1541             :                      * Check if the index just created is valid or not, as it
    1542             :                      * could be possible that it has been switched as invalid
    1543             :                      * when recursing across multiple partition levels.
    1544             :                      */
    1545         998 :                     if (!get_index_isvalid(childAddr.objectId))
    1546           6 :                         invalidate_parent = true;
    1547             :                 }
    1548             : 
    1549        1076 :                 free_attrmap(attmap);
    1550             :             }
    1551             : 
    1552         600 :             index_close(parentIndex, lockmode);
    1553             : 
    1554             :             /*
    1555             :              * The pg_index row we inserted for this index was marked
    1556             :              * indisvalid=true.  But if we attached an existing index that is
    1557             :              * invalid, this is incorrect, so update our row to invalid too.
    1558             :              */
    1559         600 :             if (invalidate_parent)
    1560             :             {
    1561          24 :                 Relation    pg_index = table_open(IndexRelationId, RowExclusiveLock);
    1562             :                 HeapTuple   tup,
    1563             :                             newtup;
    1564             : 
    1565          24 :                 tup = SearchSysCache1(INDEXRELID,
    1566             :                                       ObjectIdGetDatum(indexRelationId));
    1567          24 :                 if (!HeapTupleIsValid(tup))
    1568           0 :                     elog(ERROR, "cache lookup failed for index %u",
    1569             :                          indexRelationId);
    1570          24 :                 newtup = heap_copytuple(tup);
    1571          24 :                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
    1572          24 :                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
    1573          24 :                 ReleaseSysCache(tup);
    1574          24 :                 table_close(pg_index, RowExclusiveLock);
    1575          24 :                 heap_freetuple(newtup);
    1576             : 
    1577             :                 /*
    1578             :                  * CCI here to make this update visible, in case this recurses
    1579             :                  * across multiple partition levels.
    1580             :                  */
    1581          24 :                 CommandCounterIncrement();
    1582             :             }
    1583             :         }
    1584             : 
    1585             :         /*
    1586             :          * Indexes on partitioned tables are not themselves built, so we're
    1587             :          * done here.
    1588             :          */
    1589        2108 :         AtEOXact_GUC(false, root_save_nestlevel);
    1590        2108 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1591        2108 :         table_close(rel, NoLock);
    1592        2108 :         if (!OidIsValid(parentIndexId))
    1593        1806 :             pgstat_progress_end_command();
    1594             :         else
    1595             :         {
    1596             :             /* Update progress for an intermediate partitioned index itself */
    1597         302 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1598             :         }
    1599             : 
    1600        2108 :         return address;
    1601             :     }
    1602             : 
    1603       28788 :     AtEOXact_GUC(false, root_save_nestlevel);
    1604       28788 :     SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1605             : 
    1606       28788 :     if (!concurrent)
    1607             :     {
    1608             :         /* Close the heap and we're done, in the non-concurrent case */
    1609       28612 :         table_close(rel, NoLock);
    1610             : 
    1611             :         /*
    1612             :          * If this is the top-level index, the command is done overall;
    1613             :          * otherwise, increment progress to report one child index is done.
    1614             :          */
    1615       28612 :         if (!OidIsValid(parentIndexId))
    1616       25802 :             pgstat_progress_end_command();
    1617             :         else
    1618        2810 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1619             : 
    1620       28612 :         return address;
    1621             :     }
    1622             : 
    1623             :     /* save lockrelid and locktag for below, then close rel */
    1624         176 :     heaprelid = rel->rd_lockInfo.lockRelId;
    1625         176 :     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
    1626         176 :     table_close(rel, NoLock);
    1627             : 
    1628             :     /*
    1629             :      * For a concurrent build, it's important to make the catalog entries
    1630             :      * visible to other transactions before we start to build the index. That
    1631             :      * will prevent them from making incompatible HOT updates.  The new index
    1632             :      * will be marked not indisready and not indisvalid, so that no one else
    1633             :      * tries to either insert into it or use it for queries.
    1634             :      *
    1635             :      * We must commit our current transaction so that the index becomes
    1636             :      * visible; then start another.  Note that all the data structures we just
    1637             :      * built are lost in the commit.  The only data we keep past here are the
    1638             :      * relation IDs.
    1639             :      *
    1640             :      * Before committing, get a session-level lock on the table, to ensure
    1641             :      * that neither it nor the index can be dropped before we finish. This
    1642             :      * cannot block, even if someone else is waiting for access, because we
    1643             :      * already have the same lock within our transaction.
    1644             :      *
    1645             :      * Note: we don't currently bother with a session lock on the index,
    1646             :      * because there are no operations that could change its state while we
    1647             :      * hold lock on the parent table.  This might need to change later.
    1648             :      */
    1649         176 :     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1650             : 
    1651         176 :     PopActiveSnapshot();
    1652         176 :     CommitTransactionCommand();
    1653         176 :     StartTransactionCommand();
    1654             : 
    1655             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1656         176 :     if (safe_index)
    1657         130 :         set_indexsafe_procflags();
    1658             : 
    1659             :     /*
    1660             :      * The index is now visible, so we can report the OID.  While on it,
    1661             :      * include the report for the beginning of phase 2.
    1662             :      */
    1663             :     {
    1664         176 :         const int   progress_cols[] = {
    1665             :             PROGRESS_CREATEIDX_INDEX_OID,
    1666             :             PROGRESS_CREATEIDX_PHASE
    1667             :         };
    1668         176 :         const int64 progress_vals[] = {
    1669             :             indexRelationId,
    1670             :             PROGRESS_CREATEIDX_PHASE_WAIT_1
    1671             :         };
    1672             : 
    1673         176 :         pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
    1674             :     }
    1675             : 
    1676             :     /*
    1677             :      * Phase 2 of concurrent index build (see comments for validate_index()
    1678             :      * for an overview of how this works)
    1679             :      *
    1680             :      * Now we must wait until no running transaction could have the table open
    1681             :      * with the old list of indexes.  Use ShareLock to consider running
    1682             :      * transactions that hold locks that permit writing to the table.  Note we
    1683             :      * do not need to worry about xacts that open the table for writing after
    1684             :      * this point; they will see the new index when they open it.
    1685             :      *
    1686             :      * Note: the reason we use actual lock acquisition here, rather than just
    1687             :      * checking the ProcArray and sleeping, is that deadlock is possible if
    1688             :      * one of the transactions in question is blocked trying to acquire an
    1689             :      * exclusive lock on our table.  The lock code will detect deadlock and
    1690             :      * error out properly.
    1691             :      */
    1692         176 :     WaitForLockers(heaplocktag, ShareLock, true);
    1693             : 
    1694             :     /*
    1695             :      * At this moment we are sure that there are no transactions with the
    1696             :      * table open for write that don't have this new index in their list of
    1697             :      * indexes.  We have waited out all the existing transactions and any new
    1698             :      * transaction will have the new index in its list, but the index is still
    1699             :      * marked as "not-ready-for-inserts".  The index is consulted while
    1700             :      * deciding HOT-safety though.  This arrangement ensures that no new HOT
    1701             :      * chains can be created where the new tuple and the old tuple in the
    1702             :      * chain have different index keys.
    1703             :      *
    1704             :      * We now take a new snapshot, and build the index using all tuples that
    1705             :      * are visible in this snapshot.  We can be sure that any HOT updates to
    1706             :      * these tuples will be compatible with the index, since any updates made
    1707             :      * by transactions that didn't know about the index are now committed or
    1708             :      * rolled back.  Thus, each visible tuple is either the end of its
    1709             :      * HOT-chain or the extension of the chain is HOT-safe for this index.
    1710             :      */
    1711             : 
    1712             :     /* Set ActiveSnapshot since functions in the indexes may need it */
    1713         176 :     PushActiveSnapshot(GetTransactionSnapshot());
    1714             : 
    1715             :     /* Perform concurrent build of index */
    1716         176 :     index_concurrently_build(tableId, indexRelationId);
    1717             : 
    1718             :     /* we can do away with our snapshot */
    1719         158 :     PopActiveSnapshot();
    1720             : 
    1721             :     /*
    1722             :      * Commit this transaction to make the indisready update visible.
    1723             :      */
    1724         158 :     CommitTransactionCommand();
    1725         158 :     StartTransactionCommand();
    1726             : 
    1727             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1728         158 :     if (safe_index)
    1729         118 :         set_indexsafe_procflags();
    1730             : 
    1731             :     /*
    1732             :      * Phase 3 of concurrent index build
    1733             :      *
    1734             :      * We once again wait until no transaction can have the table open with
    1735             :      * the index marked as read-only for updates.
    1736             :      */
    1737         158 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1738             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    1739         158 :     WaitForLockers(heaplocktag, ShareLock, true);
    1740             : 
    1741             :     /*
    1742             :      * Now take the "reference snapshot" that will be used by validate_index()
    1743             :      * to filter candidate tuples.  Beware!  There might still be snapshots in
    1744             :      * use that treat some transaction as in-progress that our reference
    1745             :      * snapshot treats as committed.  If such a recently-committed transaction
    1746             :      * deleted tuples in the table, we will not include them in the index; yet
    1747             :      * those transactions which see the deleting one as still-in-progress will
    1748             :      * expect such tuples to be there once we mark the index as valid.
    1749             :      *
    1750             :      * We solve this by waiting for all endangered transactions to exit before
    1751             :      * we mark the index as valid.
    1752             :      *
    1753             :      * We also set ActiveSnapshot to this snap, since functions in indexes may
    1754             :      * need a snapshot.
    1755             :      */
    1756         158 :     snapshot = RegisterSnapshot(GetTransactionSnapshot());
    1757         158 :     PushActiveSnapshot(snapshot);
    1758             : 
    1759             :     /*
    1760             :      * Scan the index and the heap, insert any missing index entries.
    1761             :      */
    1762         158 :     validate_index(tableId, indexRelationId, snapshot);
    1763             : 
    1764             :     /*
    1765             :      * Drop the reference snapshot.  We must do this before waiting out other
    1766             :      * snapshot holders, else we will deadlock against other processes also
    1767             :      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
    1768             :      * they must wait for.  But first, save the snapshot's xmin to use as
    1769             :      * limitXmin for GetCurrentVirtualXIDs().
    1770             :      */
    1771         158 :     limitXmin = snapshot->xmin;
    1772             : 
    1773         158 :     PopActiveSnapshot();
    1774         158 :     UnregisterSnapshot(snapshot);
    1775             : 
    1776             :     /*
    1777             :      * The snapshot subsystem could still contain registered snapshots that
    1778             :      * are holding back our process's advertised xmin; in particular, if
    1779             :      * default_transaction_isolation = serializable, there is a transaction
    1780             :      * snapshot that is still active.  The CatalogSnapshot is likewise a
    1781             :      * hazard.  To ensure no deadlocks, we must commit and start yet another
    1782             :      * transaction, and do our wait before any snapshot has been taken in it.
    1783             :      */
    1784         158 :     CommitTransactionCommand();
    1785         158 :     StartTransactionCommand();
    1786             : 
    1787             :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1788         158 :     if (safe_index)
    1789         118 :         set_indexsafe_procflags();
    1790             : 
    1791             :     /* We should now definitely not be advertising any xmin. */
    1792             :     Assert(MyProc->xmin == InvalidTransactionId);
    1793             : 
    1794             :     /*
    1795             :      * The index is now valid in the sense that it contains all currently
    1796             :      * interesting tuples.  But since it might not contain tuples deleted just
    1797             :      * before the reference snap was taken, we have to wait out any
    1798             :      * transactions that might have older snapshots.
    1799             :      */
    1800         158 :     INJECTION_POINT("define-index-before-set-valid", NULL);
    1801         158 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1802             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_3);
    1803         158 :     WaitForOlderSnapshots(limitXmin, true);
    1804             : 
    1805             :     /*
    1806             :      * Updating pg_index might involve TOAST table access, so ensure we have a
    1807             :      * valid snapshot.
    1808             :      */
    1809         158 :     PushActiveSnapshot(GetTransactionSnapshot());
    1810             : 
    1811             :     /*
    1812             :      * Index can now be marked valid -- update its pg_index entry
    1813             :      */
    1814         158 :     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
    1815             : 
    1816         158 :     PopActiveSnapshot();
    1817             : 
    1818             :     /*
    1819             :      * The pg_index update will cause backends (including this one) to update
    1820             :      * relcache entries for the index itself, but we should also send a
    1821             :      * relcache inval on the parent table to force replanning of cached plans.
    1822             :      * Otherwise existing sessions might fail to use the new index where it
    1823             :      * would be useful.  (Note that our earlier commits did not create reasons
    1824             :      * to replan; so relcache flush on the index itself was sufficient.)
    1825             :      */
    1826         158 :     CacheInvalidateRelcacheByRelid(heaprelid.relId);
    1827             : 
    1828             :     /*
    1829             :      * Last thing to do is release the session-level lock on the parent table.
    1830             :      */
    1831         158 :     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1832             : 
    1833         158 :     pgstat_progress_end_command();
    1834             : 
    1835         158 :     return address;
    1836             : }
    1837             : 
    1838             : 
    1839             : /*
    1840             :  * CheckPredicate
    1841             :  *      Checks that the given partial-index predicate is valid.
    1842             :  *
    1843             :  * This used to also constrain the form of the predicate to forms that
    1844             :  * indxpath.c could do something with.  However, that seems overly
    1845             :  * restrictive.  One useful application of partial indexes is to apply
    1846             :  * a UNIQUE constraint across a subset of a table, and in that scenario
    1847             :  * any evaluable predicate will work.  So accept any predicate here
    1848             :  * (except ones requiring a plan), and let indxpath.c fend for itself.
    1849             :  */
    1850             : static void
    1851         448 : CheckPredicate(Expr *predicate)
    1852             : {
    1853             :     /*
    1854             :      * transformExpr() should have already rejected subqueries, aggregates,
    1855             :      * and window functions, based on the EXPR_KIND_ for a predicate.
    1856             :      */
    1857             : 
    1858             :     /*
    1859             :      * A predicate using mutable functions is probably wrong, for the same
    1860             :      * reasons that we don't allow an index expression to use one.
    1861             :      */
    1862         448 :     if (contain_mutable_functions_after_planning(predicate))
    1863           0 :         ereport(ERROR,
    1864             :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1865             :                  errmsg("functions in index predicate must be marked IMMUTABLE")));
    1866         448 : }
    1867             : 
    1868             : /*
    1869             :  * Compute per-index-column information, including indexed column numbers
    1870             :  * or index expressions, opclasses and their options. Note, all output vectors
    1871             :  * should be allocated for all columns, including "including" ones.
    1872             :  *
    1873             :  * If the caller switched to the table owner, ddl_userid is the role for ACL
    1874             :  * checks reached without traversing opaque expressions.  Otherwise, it's
    1875             :  * InvalidOid, and other ddl_* arguments are undefined.
    1876             :  */
    1877             : static void
    1878       31668 : ComputeIndexAttrs(ParseState *pstate,
    1879             :                   IndexInfo *indexInfo,
    1880             :                   Oid *typeOids,
    1881             :                   Oid *collationOids,
    1882             :                   Oid *opclassOids,
    1883             :                   Datum *opclassOptions,
    1884             :                   int16 *colOptions,
    1885             :                   const List *attList,  /* list of IndexElem's */
    1886             :                   const List *exclusionOpNames,
    1887             :                   Oid relId,
    1888             :                   const char *accessMethodName,
    1889             :                   Oid accessMethodId,
    1890             :                   bool amcanorder,
    1891             :                   bool isconstraint,
    1892             :                   bool iswithoutoverlaps,
    1893             :                   Oid ddl_userid,
    1894             :                   int ddl_sec_context,
    1895             :                   int *ddl_save_nestlevel)
    1896             : {
    1897             :     ListCell   *nextExclOp;
    1898             :     ListCell   *lc;
    1899             :     int         attn;
    1900       31668 :     int         nkeycols = indexInfo->ii_NumIndexKeyAttrs;
    1901             :     Oid         save_userid;
    1902             :     int         save_sec_context;
    1903             : 
    1904             :     /* Allocate space for exclusion operator info, if needed */
    1905       31668 :     if (exclusionOpNames)
    1906             :     {
    1907             :         Assert(list_length(exclusionOpNames) == nkeycols);
    1908         318 :         indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1909         318 :         indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1910         318 :         indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1911         318 :         nextExclOp = list_head(exclusionOpNames);
    1912             :     }
    1913             :     else
    1914       31350 :         nextExclOp = NULL;
    1915             : 
    1916             :     /*
    1917             :      * If this is a WITHOUT OVERLAPS constraint, we need space for exclusion
    1918             :      * ops, but we don't need to parse anything, so we can let nextExclOp be
    1919             :      * NULL. Note that for partitions/inheriting/LIKE, exclusionOpNames will
    1920             :      * be set, so we already allocated above.
    1921             :      */
    1922       31668 :     if (iswithoutoverlaps)
    1923             :     {
    1924         632 :         if (exclusionOpNames == NIL)
    1925             :         {
    1926         554 :             indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1927         554 :             indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1928         554 :             indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1929             :         }
    1930         632 :         nextExclOp = NULL;
    1931             :     }
    1932             : 
    1933       31668 :     if (OidIsValid(ddl_userid))
    1934       31564 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    1935             : 
    1936             :     /*
    1937             :      * process attributeList
    1938             :      */
    1939       31668 :     attn = 0;
    1940       75836 :     foreach(lc, attList)
    1941             :     {
    1942       44386 :         IndexElem  *attribute = (IndexElem *) lfirst(lc);
    1943             :         Oid         atttype;
    1944             :         Oid         attcollation;
    1945             : 
    1946             :         /*
    1947             :          * Process the column-or-expression to be indexed.
    1948             :          */
    1949       44386 :         if (attribute->name != NULL)
    1950             :         {
    1951             :             /* Simple index attribute */
    1952             :             HeapTuple   atttuple;
    1953             :             Form_pg_attribute attform;
    1954             : 
    1955             :             Assert(attribute->expr == NULL);
    1956       43246 :             atttuple = SearchSysCacheAttName(relId, attribute->name);
    1957       43246 :             if (!HeapTupleIsValid(atttuple))
    1958             :             {
    1959             :                 /* difference in error message spellings is historical */
    1960          30 :                 if (isconstraint)
    1961          18 :                     ereport(ERROR,
    1962             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1963             :                              errmsg("column \"%s\" named in key does not exist",
    1964             :                                     attribute->name),
    1965             :                              parser_errposition(pstate, attribute->location)));
    1966             :                 else
    1967          12 :                     ereport(ERROR,
    1968             :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1969             :                              errmsg("column \"%s\" does not exist",
    1970             :                                     attribute->name),
    1971             :                              parser_errposition(pstate, attribute->location)));
    1972             :             }
    1973       43216 :             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
    1974       43216 :             indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
    1975       43216 :             atttype = attform->atttypid;
    1976       43216 :             attcollation = attform->attcollation;
    1977       43216 :             ReleaseSysCache(atttuple);
    1978             :         }
    1979             :         else
    1980             :         {
    1981             :             /* Index expression */
    1982        1140 :             Node       *expr = attribute->expr;
    1983             : 
    1984             :             Assert(expr != NULL);
    1985             : 
    1986        1140 :             if (attn >= nkeycols)
    1987           0 :                 ereport(ERROR,
    1988             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1989             :                          errmsg("expressions are not supported in included columns"),
    1990             :                          parser_errposition(pstate, attribute->location)));
    1991        1140 :             atttype = exprType(expr);
    1992        1140 :             attcollation = exprCollation(expr);
    1993             : 
    1994             :             /*
    1995             :              * Strip any top-level COLLATE clause.  This ensures that we treat
    1996             :              * "x COLLATE y" and "(x COLLATE y)" alike.
    1997             :              */
    1998        1170 :             while (IsA(expr, CollateExpr))
    1999          30 :                 expr = (Node *) ((CollateExpr *) expr)->arg;
    2000             : 
    2001        1140 :             if (IsA(expr, Var) &&
    2002          12 :                 ((Var *) expr)->varattno != InvalidAttrNumber)
    2003             :             {
    2004             :                 /*
    2005             :                  * User wrote "(column)" or "(column COLLATE something)".
    2006             :                  * Treat it like simple attribute anyway.
    2007             :                  */
    2008          12 :                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
    2009             :             }
    2010             :             else
    2011             :             {
    2012        1128 :                 indexInfo->ii_IndexAttrNumbers[attn] = 0;    /* marks expression */
    2013        1128 :                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
    2014             :                                                     expr);
    2015             : 
    2016             :                 /*
    2017             :                  * transformExpr() should have already rejected subqueries,
    2018             :                  * aggregates, and window functions, based on the EXPR_KIND_
    2019             :                  * for an index expression.
    2020             :                  */
    2021             : 
    2022             :                 /*
    2023             :                  * An expression using mutable functions is probably wrong,
    2024             :                  * since if you aren't going to get the same result for the
    2025             :                  * same data every time, it's not clear what the index entries
    2026             :                  * mean at all.
    2027             :                  */
    2028        1128 :                 if (contain_mutable_functions_after_planning((Expr *) expr))
    2029         168 :                     ereport(ERROR,
    2030             :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2031             :                              errmsg("functions in index expression must be marked IMMUTABLE"),
    2032             :                              parser_errposition(pstate, attribute->location)));
    2033             :             }
    2034             :         }
    2035             : 
    2036       44188 :         typeOids[attn] = atttype;
    2037             : 
    2038             :         /*
    2039             :          * Included columns have no collation, no opclass and no ordering
    2040             :          * options.
    2041             :          */
    2042       44188 :         if (attn >= nkeycols)
    2043             :         {
    2044         644 :             if (attribute->collation)
    2045           0 :                 ereport(ERROR,
    2046             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2047             :                          errmsg("including column does not support a collation"),
    2048             :                          parser_errposition(pstate, attribute->location)));
    2049         644 :             if (attribute->opclass)
    2050           0 :                 ereport(ERROR,
    2051             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2052             :                          errmsg("including column does not support an operator class"),
    2053             :                          parser_errposition(pstate, attribute->location)));
    2054         644 :             if (attribute->ordering != SORTBY_DEFAULT)
    2055           0 :                 ereport(ERROR,
    2056             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2057             :                          errmsg("including column does not support ASC/DESC options"),
    2058             :                          parser_errposition(pstate, attribute->location)));
    2059         644 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2060           0 :                 ereport(ERROR,
    2061             :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2062             :                          errmsg("including column does not support NULLS FIRST/LAST options"),
    2063             :                          parser_errposition(pstate, attribute->location)));
    2064             : 
    2065         644 :             opclassOids[attn] = InvalidOid;
    2066         644 :             opclassOptions[attn] = (Datum) 0;
    2067         644 :             colOptions[attn] = 0;
    2068         644 :             collationOids[attn] = InvalidOid;
    2069         644 :             attn++;
    2070             : 
    2071         644 :             continue;
    2072             :         }
    2073             : 
    2074             :         /*
    2075             :          * Apply collation override if any.  Use of ddl_userid is necessary
    2076             :          * due to ACL checks therein, and it's safe because collations don't
    2077             :          * contain opaque expressions (or non-opaque expressions).
    2078             :          */
    2079       43544 :         if (attribute->collation)
    2080             :         {
    2081         118 :             if (OidIsValid(ddl_userid))
    2082             :             {
    2083         118 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2084         118 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2085             :             }
    2086         118 :             attcollation = get_collation_oid(attribute->collation, false);
    2087         116 :             if (OidIsValid(ddl_userid))
    2088             :             {
    2089         116 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2090         116 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2091         116 :                 RestrictSearchPath();
    2092             :             }
    2093             :         }
    2094             : 
    2095             :         /*
    2096             :          * Check we have a collation iff it's a collatable type.  The only
    2097             :          * expected failures here are (1) COLLATE applied to a noncollatable
    2098             :          * type, or (2) index expression had an unresolved collation.  But we
    2099             :          * might as well code this to be a complete consistency check.
    2100             :          */
    2101       43542 :         if (type_is_collatable(atttype))
    2102             :         {
    2103        6730 :             if (!OidIsValid(attcollation))
    2104           0 :                 ereport(ERROR,
    2105             :                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
    2106             :                          errmsg("could not determine which collation to use for index expression"),
    2107             :                          errhint("Use the COLLATE clause to set the collation explicitly."),
    2108             :                          parser_errposition(pstate, attribute->location)));
    2109             :         }
    2110             :         else
    2111             :         {
    2112       36812 :             if (OidIsValid(attcollation))
    2113          12 :                 ereport(ERROR,
    2114             :                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    2115             :                          errmsg("collations are not supported by type %s",
    2116             :                                 format_type_be(atttype)),
    2117             :                          parser_errposition(pstate, attribute->location)));
    2118             :         }
    2119             : 
    2120       43530 :         collationOids[attn] = attcollation;
    2121             : 
    2122             :         /*
    2123             :          * Identify the opclass to use.  Use of ddl_userid is necessary due to
    2124             :          * ACL checks therein.  This is safe despite opclasses containing
    2125             :          * opaque expressions (specifically, functions), because only
    2126             :          * superusers can define opclasses.
    2127             :          */
    2128       43530 :         if (OidIsValid(ddl_userid))
    2129             :         {
    2130       43420 :             AtEOXact_GUC(false, *ddl_save_nestlevel);
    2131       43420 :             SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2132             :         }
    2133       43530 :         opclassOids[attn] = ResolveOpClass(attribute->opclass,
    2134             :                                            atttype,
    2135             :                                            accessMethodName,
    2136             :                                            accessMethodId);
    2137       43524 :         if (OidIsValid(ddl_userid))
    2138             :         {
    2139       43414 :             SetUserIdAndSecContext(save_userid, save_sec_context);
    2140       43414 :             *ddl_save_nestlevel = NewGUCNestLevel();
    2141       43414 :             RestrictSearchPath();
    2142             :         }
    2143             : 
    2144             :         /*
    2145             :          * Identify the exclusion operator, if any.
    2146             :          */
    2147       43524 :         if (nextExclOp)
    2148             :         {
    2149         348 :             List       *opname = (List *) lfirst(nextExclOp);
    2150             :             Oid         opid;
    2151             :             Oid         opfamily;
    2152             :             int         strat;
    2153             : 
    2154             :             /*
    2155             :              * Find the operator --- it must accept the column datatype
    2156             :              * without runtime coercion (but binary compatibility is OK).
    2157             :              * Operators contain opaque expressions (specifically, functions).
    2158             :              * compatible_oper_opid() boils down to oper() and
    2159             :              * IsBinaryCoercible().  PostgreSQL would have security problems
    2160             :              * elsewhere if oper() started calling opaque expressions.
    2161             :              */
    2162         348 :             if (OidIsValid(ddl_userid))
    2163             :             {
    2164         348 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2165         348 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2166             :             }
    2167         348 :             opid = compatible_oper_opid(opname, atttype, atttype, false);
    2168         348 :             if (OidIsValid(ddl_userid))
    2169             :             {
    2170         348 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2171         348 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2172         348 :                 RestrictSearchPath();
    2173             :             }
    2174             : 
    2175             :             /*
    2176             :              * Only allow commutative operators to be used in exclusion
    2177             :              * constraints. If X conflicts with Y, but Y does not conflict
    2178             :              * with X, bad things will happen.
    2179             :              */
    2180         348 :             if (get_commutator(opid) != opid)
    2181           0 :                 ereport(ERROR,
    2182             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2183             :                          errmsg("operator %s is not commutative",
    2184             :                                 format_operator(opid)),
    2185             :                          errdetail("Only commutative operators can be used in exclusion constraints."),
    2186             :                          parser_errposition(pstate, attribute->location)));
    2187             : 
    2188             :             /*
    2189             :              * Operator must be a member of the right opfamily, too
    2190             :              */
    2191         348 :             opfamily = get_opclass_family(opclassOids[attn]);
    2192         348 :             strat = get_op_opfamily_strategy(opid, opfamily);
    2193         348 :             if (strat == 0)
    2194           0 :                 ereport(ERROR,
    2195             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2196             :                          errmsg("operator %s is not a member of operator family \"%s\"",
    2197             :                                 format_operator(opid),
    2198             :                                 get_opfamily_name(opfamily, false)),
    2199             :                          errdetail("The exclusion operator must be related to the index operator class for the constraint."),
    2200             :                          parser_errposition(pstate, attribute->location)));
    2201             : 
    2202         348 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2203         348 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2204         348 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2205         348 :             nextExclOp = lnext(exclusionOpNames, nextExclOp);
    2206             :         }
    2207       43176 :         else if (iswithoutoverlaps)
    2208             :         {
    2209             :             CompareType cmptype;
    2210             :             StrategyNumber strat;
    2211             :             Oid         opid;
    2212             : 
    2213        1298 :             if (attn == nkeycols - 1)
    2214         632 :                 cmptype = COMPARE_OVERLAP;
    2215             :             else
    2216         666 :                 cmptype = COMPARE_EQ;
    2217        1298 :             GetOperatorFromCompareType(opclassOids[attn], InvalidOid, cmptype, &opid, &strat);
    2218        1298 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2219        1298 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2220        1298 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2221             :         }
    2222             : 
    2223             :         /*
    2224             :          * Set up the per-column options (indoption field).  For now, this is
    2225             :          * zero for any un-ordered index, while ordered indexes have DESC and
    2226             :          * NULLS FIRST/LAST options.
    2227             :          */
    2228       43524 :         colOptions[attn] = 0;
    2229       43524 :         if (amcanorder)
    2230             :         {
    2231             :             /* default ordering is ASC */
    2232       39320 :             if (attribute->ordering == SORTBY_DESC)
    2233          42 :                 colOptions[attn] |= INDOPTION_DESC;
    2234             :             /* default null ordering is LAST for ASC, FIRST for DESC */
    2235       39320 :             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
    2236             :             {
    2237       39290 :                 if (attribute->ordering == SORTBY_DESC)
    2238          30 :                     colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2239             :             }
    2240          30 :             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
    2241          12 :                 colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2242             :         }
    2243             :         else
    2244             :         {
    2245             :             /* index AM does not support ordering */
    2246        4204 :             if (attribute->ordering != SORTBY_DEFAULT)
    2247           0 :                 ereport(ERROR,
    2248             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2249             :                          errmsg("access method \"%s\" does not support ASC/DESC options",
    2250             :                                 accessMethodName),
    2251             :                          parser_errposition(pstate, attribute->location)));
    2252        4204 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2253           0 :                 ereport(ERROR,
    2254             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2255             :                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
    2256             :                                 accessMethodName),
    2257             :                          parser_errposition(pstate, attribute->location)));
    2258             :         }
    2259             : 
    2260             :         /* Set up the per-column opclass options (attoptions field). */
    2261       43524 :         if (attribute->opclassopts)
    2262             :         {
    2263             :             Assert(attn < nkeycols);
    2264             : 
    2265         144 :             opclassOptions[attn] =
    2266         144 :                 transformRelOptions((Datum) 0, attribute->opclassopts,
    2267             :                                     NULL, NULL, false, false);
    2268             :         }
    2269             :         else
    2270       43380 :             opclassOptions[attn] = (Datum) 0;
    2271             : 
    2272       43524 :         attn++;
    2273             :     }
    2274       31450 : }
    2275             : 
    2276             : /*
    2277             :  * Resolve possibly-defaulted operator class specification
    2278             :  *
    2279             :  * Note: This is used to resolve operator class specifications in index and
    2280             :  * partition key definitions.
    2281             :  */
    2282             : Oid
    2283       43668 : ResolveOpClass(const List *opclass, Oid attrType,
    2284             :                const char *accessMethodName, Oid accessMethodId)
    2285             : {
    2286             :     char       *schemaname;
    2287             :     char       *opcname;
    2288             :     HeapTuple   tuple;
    2289             :     Form_pg_opclass opform;
    2290             :     Oid         opClassId,
    2291             :                 opInputType;
    2292             : 
    2293       43668 :     if (opclass == NIL)
    2294             :     {
    2295             :         /* no operator class specified, so find the default */
    2296       21476 :         opClassId = GetDefaultOpClass(attrType, accessMethodId);
    2297       21476 :         if (!OidIsValid(opClassId))
    2298           6 :             ereport(ERROR,
    2299             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2300             :                      errmsg("data type %s has no default operator class for access method \"%s\"",
    2301             :                             format_type_be(attrType), accessMethodName),
    2302             :                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
    2303       21470 :         return opClassId;
    2304             :     }
    2305             : 
    2306             :     /*
    2307             :      * Specific opclass name given, so look up the opclass.
    2308             :      */
    2309             : 
    2310             :     /* deconstruct the name list */
    2311       22192 :     DeconstructQualifiedName(opclass, &schemaname, &opcname);
    2312             : 
    2313       22192 :     if (schemaname)
    2314             :     {
    2315             :         /* Look in specific schema only */
    2316             :         Oid         namespaceId;
    2317             : 
    2318          28 :         namespaceId = LookupExplicitNamespace(schemaname, false);
    2319          28 :         tuple = SearchSysCache3(CLAAMNAMENSP,
    2320             :                                 ObjectIdGetDatum(accessMethodId),
    2321             :                                 PointerGetDatum(opcname),
    2322             :                                 ObjectIdGetDatum(namespaceId));
    2323             :     }
    2324             :     else
    2325             :     {
    2326             :         /* Unqualified opclass name, so search the search path */
    2327       22164 :         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
    2328       22164 :         if (!OidIsValid(opClassId))
    2329          12 :             ereport(ERROR,
    2330             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2331             :                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2332             :                             opcname, accessMethodName)));
    2333       22152 :         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
    2334             :     }
    2335             : 
    2336       22180 :     if (!HeapTupleIsValid(tuple))
    2337           0 :         ereport(ERROR,
    2338             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2339             :                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2340             :                         NameListToString(opclass), accessMethodName)));
    2341             : 
    2342             :     /*
    2343             :      * Verify that the index operator class accepts this datatype.  Note we
    2344             :      * will accept binary compatibility.
    2345             :      */
    2346       22180 :     opform = (Form_pg_opclass) GETSTRUCT(tuple);
    2347       22180 :     opClassId = opform->oid;
    2348       22180 :     opInputType = opform->opcintype;
    2349             : 
    2350       22180 :     if (!IsBinaryCoercible(attrType, opInputType))
    2351           0 :         ereport(ERROR,
    2352             :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
    2353             :                  errmsg("operator class \"%s\" does not accept data type %s",
    2354             :                         NameListToString(opclass), format_type_be(attrType))));
    2355             : 
    2356       22180 :     ReleaseSysCache(tuple);
    2357             : 
    2358       22180 :     return opClassId;
    2359             : }
    2360             : 
    2361             : /*
    2362             :  * GetDefaultOpClass
    2363             :  *
    2364             :  * Given the OIDs of a datatype and an access method, find the default
    2365             :  * operator class, if any.  Returns InvalidOid if there is none.
    2366             :  */
    2367             : Oid
    2368      117850 : GetDefaultOpClass(Oid type_id, Oid am_id)
    2369             : {
    2370      117850 :     Oid         result = InvalidOid;
    2371      117850 :     int         nexact = 0;
    2372      117850 :     int         ncompatible = 0;
    2373      117850 :     int         ncompatiblepreferred = 0;
    2374             :     Relation    rel;
    2375             :     ScanKeyData skey[1];
    2376             :     SysScanDesc scan;
    2377             :     HeapTuple   tup;
    2378             :     TYPCATEGORY tcategory;
    2379             : 
    2380             :     /* If it's a domain, look at the base type instead */
    2381      117850 :     type_id = getBaseType(type_id);
    2382             : 
    2383      117850 :     tcategory = TypeCategory(type_id);
    2384             : 
    2385             :     /*
    2386             :      * We scan through all the opclasses available for the access method,
    2387             :      * looking for one that is marked default and matches the target type
    2388             :      * (either exactly or binary-compatibly, but prefer an exact match).
    2389             :      *
    2390             :      * We could find more than one binary-compatible match.  If just one is
    2391             :      * for a preferred type, use that one; otherwise we fail, forcing the user
    2392             :      * to specify which one he wants.  (The preferred-type special case is a
    2393             :      * kluge for varchar: it's binary-compatible to both text and bpchar, so
    2394             :      * we need a tiebreaker.)  If we find more than one exact match, then
    2395             :      * someone put bogus entries in pg_opclass.
    2396             :      */
    2397      117850 :     rel = table_open(OperatorClassRelationId, AccessShareLock);
    2398             : 
    2399      117850 :     ScanKeyInit(&skey[0],
    2400             :                 Anum_pg_opclass_opcmethod,
    2401             :                 BTEqualStrategyNumber, F_OIDEQ,
    2402             :                 ObjectIdGetDatum(am_id));
    2403             : 
    2404      117850 :     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
    2405             :                               NULL, 1, skey);
    2406             : 
    2407     5275588 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    2408             :     {
    2409     5157738 :         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
    2410             : 
    2411             :         /* ignore altogether if not a default opclass */
    2412     5157738 :         if (!opclass->opcdefault)
    2413      743230 :             continue;
    2414     4414508 :         if (opclass->opcintype == type_id)
    2415             :         {
    2416      104656 :             nexact++;
    2417      104656 :             result = opclass->oid;
    2418             :         }
    2419     6462642 :         else if (nexact == 0 &&
    2420     2152790 :                  IsBinaryCoercible(type_id, opclass->opcintype))
    2421             :         {
    2422       25274 :             if (IsPreferredType(tcategory, opclass->opcintype))
    2423             :             {
    2424        2110 :                 ncompatiblepreferred++;
    2425        2110 :                 result = opclass->oid;
    2426             :             }
    2427       23164 :             else if (ncompatiblepreferred == 0)
    2428             :             {
    2429       23164 :                 ncompatible++;
    2430       23164 :                 result = opclass->oid;
    2431             :             }
    2432             :         }
    2433             :     }
    2434             : 
    2435      117850 :     systable_endscan(scan);
    2436             : 
    2437      117850 :     table_close(rel, AccessShareLock);
    2438             : 
    2439             :     /* raise error if pg_opclass contains inconsistent data */
    2440      117850 :     if (nexact > 1)
    2441           0 :         ereport(ERROR,
    2442             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2443             :                  errmsg("there are multiple default operator classes for data type %s",
    2444             :                         format_type_be(type_id))));
    2445             : 
    2446      117850 :     if (nexact == 1 ||
    2447       11086 :         ncompatiblepreferred == 1 ||
    2448       11086 :         (ncompatiblepreferred == 0 && ncompatible == 1))
    2449      116582 :         return result;
    2450             : 
    2451        1268 :     return InvalidOid;
    2452             : }
    2453             : 
    2454             : /*
    2455             :  * GetOperatorFromCompareType
    2456             :  *
    2457             :  * opclass - the opclass to use
    2458             :  * rhstype - the type for the right-hand side, or InvalidOid to use the type of the given opclass.
    2459             :  * cmptype - kind of operator to find
    2460             :  * opid - holds the operator we found
    2461             :  * strat - holds the output strategy number
    2462             :  *
    2463             :  * Finds an operator from a CompareType.  This is used for temporal index
    2464             :  * constraints (and other temporal features) to look up equality and overlaps
    2465             :  * operators.  We ask an opclass support function to translate from the
    2466             :  * compare type to the internal strategy numbers.  Raises ERROR on search
    2467             :  * failure.
    2468             :  */
    2469             : void
    2470        2122 : GetOperatorFromCompareType(Oid opclass, Oid rhstype, CompareType cmptype,
    2471             :                            Oid *opid, StrategyNumber *strat)
    2472             : {
    2473             :     Oid         amid;
    2474             :     Oid         opfamily;
    2475             :     Oid         opcintype;
    2476             : 
    2477             :     Assert(cmptype == COMPARE_EQ || cmptype == COMPARE_OVERLAP || cmptype == COMPARE_CONTAINED_BY);
    2478             : 
    2479             :     /*
    2480             :      * Use the opclass to get the opfamily, opcintype, and access method. If
    2481             :      * any of this fails, quit early.
    2482             :      */
    2483        2122 :     if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
    2484           0 :         elog(ERROR, "cache lookup failed for opclass %u", opclass);
    2485             : 
    2486        2122 :     amid = get_opclass_method(opclass);
    2487             : 
    2488             :     /*
    2489             :      * Ask the index AM to translate to its internal stratnum
    2490             :      */
    2491        2122 :     *strat = IndexAmTranslateCompareType(cmptype, amid, opfamily, true);
    2492        2122 :     if (*strat == InvalidStrategy)
    2493           0 :         ereport(ERROR,
    2494             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2495             :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2496             :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2497             :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2498             :                 errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".",
    2499             :                           cmptype, get_opfamily_name(opfamily, false), get_am_name(amid)));
    2500             : 
    2501             :     /*
    2502             :      * We parameterize rhstype so foreign keys can ask for a <@ operator whose
    2503             :      * rhs matches the aggregate function. For example range_agg returns
    2504             :      * anymultirange.
    2505             :      */
    2506        2122 :     if (!OidIsValid(rhstype))
    2507        1710 :         rhstype = opcintype;
    2508        2122 :     *opid = get_opfamily_member(opfamily, opcintype, rhstype, *strat);
    2509             : 
    2510        2122 :     if (!OidIsValid(*opid))
    2511           0 :         ereport(ERROR,
    2512             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2513             :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2514             :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2515             :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2516             :                 errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    2517             :                           get_opfamily_name(opfamily, false), get_am_name(amid)));
    2518        2122 : }
    2519             : 
    2520             : /*
    2521             :  *  makeObjectName()
    2522             :  *
    2523             :  *  Create a name for an implicitly created index, sequence, constraint,
    2524             :  *  extended statistics, etc.
    2525             :  *
    2526             :  *  The parameters are typically: the original table name, the original field
    2527             :  *  name, and a "type" string (such as "seq" or "pkey").    The field name
    2528             :  *  and/or type can be NULL if not relevant.
    2529             :  *
    2530             :  *  The result is a palloc'd string.
    2531             :  *
    2532             :  *  The basic result we want is "name1_name2_label", omitting "_name2" or
    2533             :  *  "_label" when those parameters are NULL.  However, we must generate
    2534             :  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
    2535             :  *  both names if necessary to make a short-enough string.  The label part
    2536             :  *  is never truncated (so it had better be reasonably short).
    2537             :  *
    2538             :  *  The caller is responsible for checking uniqueness of the generated
    2539             :  *  name and retrying as needed; retrying will be done by altering the
    2540             :  *  "label" string (which is why we never truncate that part).
    2541             :  */
    2542             : char *
    2543      119788 : makeObjectName(const char *name1, const char *name2, const char *label)
    2544             : {
    2545             :     char       *name;
    2546      119788 :     int         overhead = 0;   /* chars needed for label and underscores */
    2547             :     int         availchars;     /* chars available for name(s) */
    2548             :     int         name1chars;     /* chars allocated to name1 */
    2549             :     int         name2chars;     /* chars allocated to name2 */
    2550             :     int         ndx;
    2551             : 
    2552      119788 :     name1chars = strlen(name1);
    2553      119788 :     if (name2)
    2554             :     {
    2555      105408 :         name2chars = strlen(name2);
    2556      105408 :         overhead++;             /* allow for separating underscore */
    2557             :     }
    2558             :     else
    2559       14380 :         name2chars = 0;
    2560      119788 :     if (label)
    2561       45126 :         overhead += strlen(label) + 1;
    2562             : 
    2563      119788 :     availchars = NAMEDATALEN - 1 - overhead;
    2564             :     Assert(availchars > 0);      /* else caller chose a bad label */
    2565             : 
    2566             :     /*
    2567             :      * If we must truncate, preferentially truncate the longer name. This
    2568             :      * logic could be expressed without a loop, but it's simple and obvious as
    2569             :      * a loop.
    2570             :      */
    2571      119854 :     while (name1chars + name2chars > availchars)
    2572             :     {
    2573          66 :         if (name1chars > name2chars)
    2574           0 :             name1chars--;
    2575             :         else
    2576          66 :             name2chars--;
    2577             :     }
    2578             : 
    2579      119788 :     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
    2580      119788 :     if (name2)
    2581      105408 :         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
    2582             : 
    2583             :     /* Now construct the string using the chosen lengths */
    2584      119788 :     name = palloc(name1chars + name2chars + overhead + 1);
    2585      119788 :     memcpy(name, name1, name1chars);
    2586      119788 :     ndx = name1chars;
    2587      119788 :     if (name2)
    2588             :     {
    2589      105408 :         name[ndx++] = '_';
    2590      105408 :         memcpy(name + ndx, name2, name2chars);
    2591      105408 :         ndx += name2chars;
    2592             :     }
    2593      119788 :     if (label)
    2594             :     {
    2595       45126 :         name[ndx++] = '_';
    2596       45126 :         strcpy(name + ndx, label);
    2597             :     }
    2598             :     else
    2599       74662 :         name[ndx] = '\0';
    2600             : 
    2601      119788 :     return name;
    2602             : }
    2603             : 
    2604             : /*
    2605             :  * Select a nonconflicting name for a new relation.  This is ordinarily
    2606             :  * used to choose index names (which is why it's here) but it can also
    2607             :  * be used for sequences, or any autogenerated relation kind.
    2608             :  *
    2609             :  * name1, name2, and label are used the same way as for makeObjectName(),
    2610             :  * except that the label can't be NULL; digits will be appended to the label
    2611             :  * if needed to create a name that is unique within the specified namespace.
    2612             :  *
    2613             :  * If isconstraint is true, we also avoid choosing a name matching any
    2614             :  * existing constraint in the same namespace.  (This is stricter than what
    2615             :  * Postgres itself requires, but the SQL standard says that constraint names
    2616             :  * should be unique within schemas, so we follow that for autogenerated
    2617             :  * constraint names.)
    2618             :  *
    2619             :  * Note: it is theoretically possible to get a collision anyway, if someone
    2620             :  * else chooses the same name concurrently.  We shorten the race condition
    2621             :  * window by checking for conflicting relations using SnapshotDirty, but
    2622             :  * that doesn't close the window entirely.  This is fairly unlikely to be
    2623             :  * a problem in practice, especially if one is holding an exclusive lock on
    2624             :  * the relation identified by name1.  However, if choosing multiple names
    2625             :  * within a single command, you'd better create the new object and do
    2626             :  * CommandCounterIncrement before choosing the next one!
    2627             :  *
    2628             :  * Returns a palloc'd string.
    2629             :  */
    2630             : char *
    2631       14726 : ChooseRelationName(const char *name1, const char *name2,
    2632             :                    const char *label, Oid namespaceid,
    2633             :                    bool isconstraint)
    2634             : {
    2635       14726 :     int         pass = 0;
    2636       14726 :     char       *relname = NULL;
    2637             :     char        modlabel[NAMEDATALEN];
    2638             :     SnapshotData SnapshotDirty;
    2639             :     Relation    pgclassrel;
    2640             : 
    2641             :     /* prepare to search pg_class with a dirty snapshot */
    2642       14726 :     InitDirtySnapshot(SnapshotDirty);
    2643       14726 :     pgclassrel = table_open(RelationRelationId, AccessShareLock);
    2644             : 
    2645             :     /* try the unmodified label first */
    2646       14726 :     strlcpy(modlabel, label, sizeof(modlabel));
    2647             : 
    2648             :     for (;;)
    2649        1098 :     {
    2650             :         ScanKeyData key[2];
    2651             :         SysScanDesc scan;
    2652             :         bool        collides;
    2653             : 
    2654       15824 :         relname = makeObjectName(name1, name2, modlabel);
    2655             : 
    2656             :         /* is there any conflicting relation name? */
    2657       15824 :         ScanKeyInit(&key[0],
    2658             :                     Anum_pg_class_relname,
    2659             :                     BTEqualStrategyNumber, F_NAMEEQ,
    2660             :                     CStringGetDatum(relname));
    2661       15824 :         ScanKeyInit(&key[1],
    2662             :                     Anum_pg_class_relnamespace,
    2663             :                     BTEqualStrategyNumber, F_OIDEQ,
    2664             :                     ObjectIdGetDatum(namespaceid));
    2665             : 
    2666       15824 :         scan = systable_beginscan(pgclassrel, ClassNameNspIndexId,
    2667             :                                   true /* indexOK */ ,
    2668             :                                   &SnapshotDirty,
    2669             :                                   2, key);
    2670             : 
    2671       15824 :         collides = HeapTupleIsValid(systable_getnext(scan));
    2672             : 
    2673       15824 :         systable_endscan(scan);
    2674             : 
    2675             :         /* break out of loop if no conflict */
    2676       15824 :         if (!collides)
    2677             :         {
    2678       14732 :             if (!isconstraint ||
    2679        9294 :                 !ConstraintNameExists(relname, namespaceid))
    2680             :                 break;
    2681             :         }
    2682             : 
    2683             :         /* found a conflict, so try a new name component */
    2684        1098 :         pfree(relname);
    2685        1098 :         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
    2686             :     }
    2687             : 
    2688       14726 :     table_close(pgclassrel, AccessShareLock);
    2689             : 
    2690       14726 :     return relname;
    2691             : }
    2692             : 
    2693             : /*
    2694             :  * Select the name to be used for an index.
    2695             :  *
    2696             :  * The argument list is pretty ad-hoc :-(
    2697             :  */
    2698             : static char *
    2699       12316 : ChooseIndexName(const char *tabname, Oid namespaceId,
    2700             :                 const List *colnames, const List *exclusionOpNames,
    2701             :                 bool primary, bool isconstraint)
    2702             : {
    2703             :     char       *indexname;
    2704             : 
    2705       12316 :     if (primary)
    2706             :     {
    2707             :         /* the primary key's name does not depend on the specific column(s) */
    2708        8316 :         indexname = ChooseRelationName(tabname,
    2709             :                                        NULL,
    2710             :                                        "pkey",
    2711             :                                        namespaceId,
    2712             :                                        true);
    2713             :     }
    2714        4000 :     else if (exclusionOpNames != NIL)
    2715             :     {
    2716         206 :         indexname = ChooseRelationName(tabname,
    2717         206 :                                        ChooseIndexNameAddition(colnames),
    2718             :                                        "excl",
    2719             :                                        namespaceId,
    2720             :                                        true);
    2721             :     }
    2722        3794 :     else if (isconstraint)
    2723             :     {
    2724         766 :         indexname = ChooseRelationName(tabname,
    2725         766 :                                        ChooseIndexNameAddition(colnames),
    2726             :                                        "key",
    2727             :                                        namespaceId,
    2728             :                                        true);
    2729             :     }
    2730             :     else
    2731             :     {
    2732        3028 :         indexname = ChooseRelationName(tabname,
    2733        3028 :                                        ChooseIndexNameAddition(colnames),
    2734             :                                        "idx",
    2735             :                                        namespaceId,
    2736             :                                        false);
    2737             :     }
    2738             : 
    2739       12316 :     return indexname;
    2740             : }
    2741             : 
    2742             : /*
    2743             :  * Generate "name2" for a new index given the list of column names for it
    2744             :  * (as produced by ChooseIndexColumnNames).  This will be passed to
    2745             :  * ChooseRelationName along with the parent table name and a suitable label.
    2746             :  *
    2747             :  * We know that less than NAMEDATALEN characters will actually be used,
    2748             :  * so we can truncate the result once we've generated that many.
    2749             :  *
    2750             :  * XXX See also ChooseForeignKeyConstraintNameAddition and
    2751             :  * ChooseExtendedStatisticNameAddition.
    2752             :  */
    2753             : static char *
    2754        4000 : ChooseIndexNameAddition(const List *colnames)
    2755             : {
    2756             :     char        buf[NAMEDATALEN * 2];
    2757        4000 :     int         buflen = 0;
    2758             :     ListCell   *lc;
    2759             : 
    2760        4000 :     buf[0] = '\0';
    2761        9070 :     foreach(lc, colnames)
    2762             :     {
    2763        5070 :         const char *name = (const char *) lfirst(lc);
    2764             : 
    2765        5070 :         if (buflen > 0)
    2766        1070 :             buf[buflen++] = '_';    /* insert _ between names */
    2767             : 
    2768             :         /*
    2769             :          * At this point we have buflen <= NAMEDATALEN.  name should be less
    2770             :          * than NAMEDATALEN already, but use strlcpy for paranoia.
    2771             :          */
    2772        5070 :         strlcpy(buf + buflen, name, NAMEDATALEN);
    2773        5070 :         buflen += strlen(buf + buflen);
    2774        5070 :         if (buflen >= NAMEDATALEN)
    2775           0 :             break;
    2776             :     }
    2777        4000 :     return pstrdup(buf);
    2778             : }
    2779             : 
    2780             : /*
    2781             :  * Select the actual names to be used for the columns of an index, given the
    2782             :  * list of IndexElems for the columns.  This is mostly about ensuring the
    2783             :  * names are unique so we don't get a conflicting-attribute-names error.
    2784             :  *
    2785             :  * Returns a List of plain strings (char *, not String nodes).
    2786             :  */
    2787             : static List *
    2788       31652 : ChooseIndexColumnNames(const List *indexElems)
    2789             : {
    2790       31652 :     List       *result = NIL;
    2791             :     ListCell   *lc;
    2792             : 
    2793       76074 :     foreach(lc, indexElems)
    2794             :     {
    2795       44422 :         IndexElem  *ielem = (IndexElem *) lfirst(lc);
    2796             :         const char *origname;
    2797             :         const char *curname;
    2798             :         int         i;
    2799             :         char        buf[NAMEDATALEN];
    2800             : 
    2801             :         /* Get the preliminary name from the IndexElem */
    2802       44422 :         if (ielem->indexcolname)
    2803        4384 :             origname = ielem->indexcolname; /* caller-specified name */
    2804       40038 :         else if (ielem->name)
    2805       39628 :             origname = ielem->name; /* simple column reference */
    2806             :         else
    2807         410 :             origname = "expr";    /* default name for expression */
    2808             : 
    2809             :         /* If it conflicts with any previous column, tweak it */
    2810       44422 :         curname = origname;
    2811       44422 :         for (i = 1;; i++)
    2812          62 :         {
    2813             :             ListCell   *lc2;
    2814             :             char        nbuf[32];
    2815             :             int         nlen;
    2816             : 
    2817       69486 :             foreach(lc2, result)
    2818             :             {
    2819       25064 :                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
    2820          62 :                     break;
    2821             :             }
    2822       44484 :             if (lc2 == NULL)
    2823       44422 :                 break;          /* found nonconflicting name */
    2824             : 
    2825          62 :             sprintf(nbuf, "%d", i);
    2826             : 
    2827             :             /* Ensure generated names are shorter than NAMEDATALEN */
    2828          62 :             nlen = pg_mbcliplen(origname, strlen(origname),
    2829          62 :                                 NAMEDATALEN - 1 - strlen(nbuf));
    2830          62 :             memcpy(buf, origname, nlen);
    2831          62 :             strcpy(buf + nlen, nbuf);
    2832          62 :             curname = buf;
    2833             :         }
    2834             : 
    2835             :         /* And attach to the result list */
    2836       44422 :         result = lappend(result, pstrdup(curname));
    2837             :     }
    2838       31652 :     return result;
    2839             : }
    2840             : 
    2841             : /*
    2842             :  * ExecReindex
    2843             :  *
    2844             :  * Primary entry point for manual REINDEX commands.  This is mainly a
    2845             :  * preparation wrapper for the real operations that will happen in
    2846             :  * each subroutine of REINDEX.
    2847             :  */
    2848             : void
    2849        1130 : ExecReindex(ParseState *pstate, const ReindexStmt *stmt, bool isTopLevel)
    2850             : {
    2851        1130 :     ReindexParams params = {0};
    2852             :     ListCell   *lc;
    2853        1130 :     bool        concurrently = false;
    2854        1130 :     bool        verbose = false;
    2855        1130 :     char       *tablespacename = NULL;
    2856             : 
    2857             :     /* Parse option list */
    2858        1906 :     foreach(lc, stmt->params)
    2859             :     {
    2860         776 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2861             : 
    2862         776 :         if (strcmp(opt->defname, "verbose") == 0)
    2863          14 :             verbose = defGetBoolean(opt);
    2864         762 :         else if (strcmp(opt->defname, "concurrently") == 0)
    2865         634 :             concurrently = defGetBoolean(opt);
    2866         128 :         else if (strcmp(opt->defname, "tablespace") == 0)
    2867         128 :             tablespacename = defGetString(opt);
    2868             :         else
    2869           0 :             ereport(ERROR,
    2870             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2871             :                      errmsg("unrecognized %s option \"%s\"",
    2872             :                             "REINDEX", opt->defname),
    2873             :                      parser_errposition(pstate, opt->location)));
    2874             :     }
    2875             : 
    2876        1130 :     if (concurrently)
    2877         634 :         PreventInTransactionBlock(isTopLevel,
    2878             :                                   "REINDEX CONCURRENTLY");
    2879             : 
    2880        1104 :     params.options =
    2881        2208 :         (verbose ? REINDEXOPT_VERBOSE : 0) |
    2882        1104 :         (concurrently ? REINDEXOPT_CONCURRENTLY : 0);
    2883             : 
    2884             :     /*
    2885             :      * Assign the tablespace OID to move indexes to, with InvalidOid to do
    2886             :      * nothing.
    2887             :      */
    2888        1104 :     if (tablespacename != NULL)
    2889             :     {
    2890         128 :         params.tablespaceOid = get_tablespace_oid(tablespacename, false);
    2891             : 
    2892             :         /* Check permissions except when moving to database's default */
    2893         128 :         if (OidIsValid(params.tablespaceOid) &&
    2894         128 :             params.tablespaceOid != MyDatabaseTableSpace)
    2895             :         {
    2896             :             AclResult   aclresult;
    2897             : 
    2898         128 :             aclresult = object_aclcheck(TableSpaceRelationId, params.tablespaceOid,
    2899             :                                         GetUserId(), ACL_CREATE);
    2900         128 :             if (aclresult != ACLCHECK_OK)
    2901          12 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2902          12 :                                get_tablespace_name(params.tablespaceOid));
    2903             :         }
    2904             :     }
    2905             :     else
    2906         976 :         params.tablespaceOid = InvalidOid;
    2907             : 
    2908        1092 :     switch (stmt->kind)
    2909             :     {
    2910         412 :         case REINDEX_OBJECT_INDEX:
    2911         412 :             ReindexIndex(stmt, &params, isTopLevel);
    2912         306 :             break;
    2913         502 :         case REINDEX_OBJECT_TABLE:
    2914         502 :             ReindexTable(stmt, &params, isTopLevel);
    2915         380 :             break;
    2916         178 :         case REINDEX_OBJECT_SCHEMA:
    2917             :         case REINDEX_OBJECT_SYSTEM:
    2918             :         case REINDEX_OBJECT_DATABASE:
    2919             : 
    2920             :             /*
    2921             :              * This cannot run inside a user transaction block; if we were
    2922             :              * inside a transaction, then its commit- and
    2923             :              * start-transaction-command calls would not have the intended
    2924             :              * effect!
    2925             :              */
    2926         178 :             PreventInTransactionBlock(isTopLevel,
    2927         242 :                                       (stmt->kind == REINDEX_OBJECT_SCHEMA) ? "REINDEX SCHEMA" :
    2928          64 :                                       (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
    2929             :                                       "REINDEX DATABASE");
    2930         172 :             ReindexMultipleTables(stmt, &params);
    2931         122 :             break;
    2932           0 :         default:
    2933           0 :             elog(ERROR, "unrecognized object type: %d",
    2934             :                  (int) stmt->kind);
    2935             :             break;
    2936             :     }
    2937         808 : }
    2938             : 
    2939             : /*
    2940             :  * ReindexIndex
    2941             :  *      Recreate a specific index.
    2942             :  */
    2943             : static void
    2944         412 : ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    2945             : {
    2946         412 :     const RangeVar *indexRelation = stmt->relation;
    2947             :     struct ReindexIndexCallbackState state;
    2948             :     Oid         indOid;
    2949             :     char        persistence;
    2950             :     char        relkind;
    2951             : 
    2952             :     /*
    2953             :      * Find and lock index, and check permissions on table; use callback to
    2954             :      * obtain lock on table first, to avoid deadlock hazard.  The lock level
    2955             :      * used here must match the index lock obtained in reindex_index().
    2956             :      *
    2957             :      * If it's a temporary index, we will perform a non-concurrent reindex,
    2958             :      * even if CONCURRENTLY was requested.  In that case, reindex_index() will
    2959             :      * upgrade the lock, but that's OK, because other sessions can't hold
    2960             :      * locks on our temporary table.
    2961             :      */
    2962         412 :     state.params = *params;
    2963         412 :     state.locked_table_oid = InvalidOid;
    2964         412 :     indOid = RangeVarGetRelidExtended(indexRelation,
    2965         412 :                                       (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2966             :                                       ShareUpdateExclusiveLock : AccessExclusiveLock,
    2967             :                                       0,
    2968             :                                       RangeVarCallbackForReindexIndex,
    2969             :                                       &state);
    2970             : 
    2971             :     /*
    2972             :      * Obtain the current persistence and kind of the existing index.  We
    2973             :      * already hold a lock on the index.
    2974             :      */
    2975         364 :     persistence = get_rel_persistence(indOid);
    2976         364 :     relkind = get_rel_relkind(indOid);
    2977             : 
    2978         364 :     if (relkind == RELKIND_PARTITIONED_INDEX)
    2979          36 :         ReindexPartitions(stmt, indOid, params, isTopLevel);
    2980         328 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    2981             :              persistence != RELPERSISTENCE_TEMP)
    2982         198 :         ReindexRelationConcurrently(stmt, indOid, params);
    2983             :     else
    2984             :     {
    2985         130 :         ReindexParams newparams = *params;
    2986             : 
    2987         130 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    2988         130 :         reindex_index(stmt, indOid, false, persistence, &newparams);
    2989             :     }
    2990         306 : }
    2991             : 
    2992             : /*
    2993             :  * Check permissions on table before acquiring relation lock; also lock
    2994             :  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
    2995             :  * deadlocks.
    2996             :  */
    2997             : static void
    2998         430 : RangeVarCallbackForReindexIndex(const RangeVar *relation,
    2999             :                                 Oid relId, Oid oldRelId, void *arg)
    3000             : {
    3001             :     char        relkind;
    3002         430 :     struct ReindexIndexCallbackState *state = arg;
    3003             :     LOCKMODE    table_lockmode;
    3004             :     Oid         table_oid;
    3005             :     AclResult   aclresult;
    3006             : 
    3007             :     /*
    3008             :      * Lock level here should match table lock in reindex_index() for
    3009             :      * non-concurrent case and table locks used by index_concurrently_*() for
    3010             :      * concurrent case.
    3011             :      */
    3012         860 :     table_lockmode = (state->params.options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3013         430 :         ShareUpdateExclusiveLock : ShareLock;
    3014             : 
    3015             :     /*
    3016             :      * If we previously locked some other index's heap, and the name we're
    3017             :      * looking up no longer refers to that relation, release the now-useless
    3018             :      * lock.
    3019             :      */
    3020         430 :     if (relId != oldRelId && OidIsValid(oldRelId))
    3021             :     {
    3022           6 :         UnlockRelationOid(state->locked_table_oid, table_lockmode);
    3023           6 :         state->locked_table_oid = InvalidOid;
    3024             :     }
    3025             : 
    3026             :     /* If the relation does not exist, there's nothing more to do. */
    3027         430 :     if (!OidIsValid(relId))
    3028          12 :         return;
    3029             : 
    3030             :     /* If the relation does exist, check whether it's an index. */
    3031         418 :     relkind = get_rel_relkind(relId);
    3032         418 :     if (relkind != RELKIND_INDEX &&
    3033             :         relkind != RELKIND_PARTITIONED_INDEX)
    3034          24 :         ereport(ERROR,
    3035             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3036             :                  errmsg("\"%s\" is not an index", relation->relname)));
    3037             : 
    3038             :     /* Look up the index's table. */
    3039         394 :     table_oid = IndexGetRelation(relId, false);
    3040             : 
    3041             :     /*
    3042             :      * In the unlikely event that, upon retry, we get the same index OID with
    3043             :      * a different table OID, fail.  RangeVarGetRelidExtended() will have
    3044             :      * already locked the index in this case, and it won't retry again, so we
    3045             :      * can't lock the newly discovered table OID without risking deadlock.
    3046             :      * Also, while this corner case is indeed possible, it is extremely
    3047             :      * unlikely to happen in practice, so it's probably not worth any more
    3048             :      * effort than this.
    3049             :      */
    3050         394 :     if (relId == oldRelId && table_oid != state->locked_table_oid)
    3051           0 :         ereport(ERROR,
    3052             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    3053             :                  errmsg("index \"%s\" was concurrently dropped",
    3054             :                         relation->relname)));
    3055             : 
    3056             :     /* Check permissions. */
    3057         394 :     aclresult = pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN);
    3058         394 :     if (aclresult != ACLCHECK_OK)
    3059          12 :         aclcheck_error(aclresult, OBJECT_INDEX, relation->relname);
    3060             : 
    3061             :     /* Lock heap before index to avoid deadlock. */
    3062         382 :     if (relId != oldRelId)
    3063             :     {
    3064         370 :         LockRelationOid(table_oid, table_lockmode);
    3065         370 :         state->locked_table_oid = table_oid;
    3066             :     }
    3067             : }
    3068             : 
    3069             : /*
    3070             :  * ReindexTable
    3071             :  *      Recreate all indexes of a table (and of its toast table, if any)
    3072             :  */
    3073             : static Oid
    3074         502 : ReindexTable(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    3075             : {
    3076             :     Oid         heapOid;
    3077             :     bool        result;
    3078         502 :     const RangeVar *relation = stmt->relation;
    3079             : 
    3080             :     /*
    3081             :      * The lock level used here should match reindex_relation().
    3082             :      *
    3083             :      * If it's a temporary table, we will perform a non-concurrent reindex,
    3084             :      * even if CONCURRENTLY was requested.  In that case, reindex_relation()
    3085             :      * will upgrade the lock, but that's OK, because other sessions can't hold
    3086             :      * locks on our temporary table.
    3087             :      */
    3088         502 :     heapOid = RangeVarGetRelidExtended(relation,
    3089         502 :                                        (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3090             :                                        ShareUpdateExclusiveLock : ShareLock,
    3091             :                                        0,
    3092             :                                        RangeVarCallbackMaintainsTable, NULL);
    3093             : 
    3094         456 :     if (get_rel_relkind(heapOid) == RELKIND_PARTITIONED_TABLE)
    3095          72 :         ReindexPartitions(stmt, heapOid, params, isTopLevel);
    3096         624 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3097         240 :              get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
    3098             :     {
    3099         228 :         result = ReindexRelationConcurrently(stmt, heapOid, params);
    3100             : 
    3101         190 :         if (!result)
    3102          18 :             ereport(NOTICE,
    3103             :                     (errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
    3104             :                             relation->relname)));
    3105             :     }
    3106             :     else
    3107             :     {
    3108         156 :         ReindexParams newparams = *params;
    3109             : 
    3110         156 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    3111         156 :         result = reindex_relation(stmt, heapOid,
    3112             :                                   REINDEX_REL_PROCESS_TOAST |
    3113             :                                   REINDEX_REL_CHECK_CONSTRAINTS,
    3114             :                                   &newparams);
    3115         124 :         if (!result)
    3116          12 :             ereport(NOTICE,
    3117             :                     (errmsg("table \"%s\" has no indexes to reindex",
    3118             :                             relation->relname)));
    3119             :     }
    3120             : 
    3121         380 :     return heapOid;
    3122             : }
    3123             : 
    3124             : /*
    3125             :  * ReindexMultipleTables
    3126             :  *      Recreate indexes of tables selected by objectName/objectKind.
    3127             :  *
    3128             :  * To reduce the probability of deadlocks, each table is reindexed in a
    3129             :  * separate transaction, so we can release the lock on it right away.
    3130             :  * That means this must not be called within a user transaction block!
    3131             :  */
    3132             : static void
    3133         172 : ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params)
    3134             : {
    3135             : 
    3136             :     Oid         objectOid;
    3137             :     Relation    relationRelation;
    3138             :     TableScanDesc scan;
    3139             :     ScanKeyData scan_keys[1];
    3140             :     HeapTuple   tuple;
    3141             :     MemoryContext private_context;
    3142             :     MemoryContext old;
    3143         172 :     List       *relids = NIL;
    3144             :     int         num_keys;
    3145         172 :     bool        concurrent_warning = false;
    3146         172 :     bool        tablespace_warning = false;
    3147         172 :     const char *objectName = stmt->name;
    3148         172 :     const ReindexObjectType objectKind = stmt->kind;
    3149             : 
    3150             :     Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
    3151             :            objectKind == REINDEX_OBJECT_SYSTEM ||
    3152             :            objectKind == REINDEX_OBJECT_DATABASE);
    3153             : 
    3154             :     /*
    3155             :      * This matches the options enforced by the grammar, where the object name
    3156             :      * is optional for DATABASE and SYSTEM.
    3157             :      */
    3158             :     Assert(objectName || objectKind != REINDEX_OBJECT_SCHEMA);
    3159             : 
    3160         172 :     if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3161          34 :         (params->options & REINDEXOPT_CONCURRENTLY) != 0)
    3162          20 :         ereport(ERROR,
    3163             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3164             :                  errmsg("cannot reindex system catalogs concurrently")));
    3165             : 
    3166             :     /*
    3167             :      * Get OID of object to reindex, being the database currently being used
    3168             :      * by session for a database or for system catalogs, or the schema defined
    3169             :      * by caller. At the same time do permission checks that need different
    3170             :      * processing depending on the object type.
    3171             :      */
    3172         152 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3173             :     {
    3174         108 :         objectOid = get_namespace_oid(objectName, false);
    3175             : 
    3176         102 :         if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId()) &&
    3177          24 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3178          18 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
    3179             :                            objectName);
    3180             :     }
    3181             :     else
    3182             :     {
    3183          44 :         objectOid = MyDatabaseId;
    3184             : 
    3185          44 :         if (objectName && strcmp(objectName, get_database_name(objectOid)) != 0)
    3186           6 :             ereport(ERROR,
    3187             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3188             :                      errmsg("can only reindex the currently open database")));
    3189          38 :         if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId()) &&
    3190           0 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3191           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    3192           0 :                            get_database_name(objectOid));
    3193             :     }
    3194             : 
    3195             :     /*
    3196             :      * Create a memory context that will survive forced transaction commits we
    3197             :      * do below.  Since it is a child of PortalContext, it will go away
    3198             :      * eventually even if we suffer an error; there's no need for special
    3199             :      * abort cleanup logic.
    3200             :      */
    3201         122 :     private_context = AllocSetContextCreate(PortalContext,
    3202             :                                             "ReindexMultipleTables",
    3203             :                                             ALLOCSET_SMALL_SIZES);
    3204             : 
    3205             :     /*
    3206             :      * Define the search keys to find the objects to reindex. For a schema, we
    3207             :      * select target relations using relnamespace, something not necessary for
    3208             :      * a database-wide operation.
    3209             :      */
    3210         122 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3211             :     {
    3212          84 :         num_keys = 1;
    3213          84 :         ScanKeyInit(&scan_keys[0],
    3214             :                     Anum_pg_class_relnamespace,
    3215             :                     BTEqualStrategyNumber, F_OIDEQ,
    3216             :                     ObjectIdGetDatum(objectOid));
    3217             :     }
    3218             :     else
    3219          38 :         num_keys = 0;
    3220             : 
    3221             :     /*
    3222             :      * Scan pg_class to build a list of the relations we need to reindex.
    3223             :      *
    3224             :      * We only consider plain relations and materialized views here (toast
    3225             :      * rels will be processed indirectly by reindex_relation).
    3226             :      */
    3227         122 :     relationRelation = table_open(RelationRelationId, AccessShareLock);
    3228         122 :     scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
    3229       18612 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3230             :     {
    3231       18490 :         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
    3232       18490 :         Oid         relid = classtuple->oid;
    3233             : 
    3234             :         /*
    3235             :          * Only regular tables and matviews can have indexes, so ignore any
    3236             :          * other kind of relation.
    3237             :          *
    3238             :          * Partitioned tables/indexes are skipped but matching leaf partitions
    3239             :          * are processed.
    3240             :          */
    3241       18490 :         if (classtuple->relkind != RELKIND_RELATION &&
    3242       15208 :             classtuple->relkind != RELKIND_MATVIEW)
    3243       15190 :             continue;
    3244             : 
    3245             :         /* Skip temp tables of other backends; we can't reindex them at all */
    3246        3300 :         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
    3247          36 :             !isTempNamespace(classtuple->relnamespace))
    3248           0 :             continue;
    3249             : 
    3250             :         /*
    3251             :          * Check user/system classification.  SYSTEM processes all the
    3252             :          * catalogs, and DATABASE processes everything that's not a catalog.
    3253             :          */
    3254        3300 :         if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3255         984 :             !IsCatalogRelationOid(relid))
    3256          88 :             continue;
    3257        4856 :         else if (objectKind == REINDEX_OBJECT_DATABASE &&
    3258        1644 :                  IsCatalogRelationOid(relid))
    3259        1536 :             continue;
    3260             : 
    3261             :         /*
    3262             :          * We already checked privileges on the database or schema, but we
    3263             :          * further restrict reindexing shared catalogs to roles with the
    3264             :          * MAINTAIN privilege on the relation.
    3265             :          */
    3266        1918 :         if (classtuple->relisshared &&
    3267         242 :             pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
    3268           0 :             continue;
    3269             : 
    3270             :         /*
    3271             :          * Skip system tables, since index_create() would reject indexing them
    3272             :          * concurrently (and it would likely fail if we tried).
    3273             :          */
    3274        2158 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3275         482 :             IsCatalogRelationOid(relid))
    3276             :         {
    3277         384 :             if (!concurrent_warning)
    3278           6 :                 ereport(WARNING,
    3279             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3280             :                          errmsg("cannot reindex system catalogs concurrently, skipping all")));
    3281         384 :             concurrent_warning = true;
    3282         384 :             continue;
    3283             :         }
    3284             : 
    3285             :         /*
    3286             :          * If a new tablespace is set, check if this relation has to be
    3287             :          * skipped.
    3288             :          */
    3289        1292 :         if (OidIsValid(params->tablespaceOid))
    3290             :         {
    3291           0 :             bool        skip_rel = false;
    3292             : 
    3293             :             /*
    3294             :              * Mapped relations cannot be moved to different tablespaces (in
    3295             :              * particular this eliminates all shared catalogs.).
    3296             :              */
    3297           0 :             if (RELKIND_HAS_STORAGE(classtuple->relkind) &&
    3298           0 :                 !RelFileNumberIsValid(classtuple->relfilenode))
    3299           0 :                 skip_rel = true;
    3300             : 
    3301             :             /*
    3302             :              * A system relation is always skipped, even with
    3303             :              * allow_system_table_mods enabled.
    3304             :              */
    3305           0 :             if (IsSystemClass(relid, classtuple))
    3306           0 :                 skip_rel = true;
    3307             : 
    3308           0 :             if (skip_rel)
    3309             :             {
    3310           0 :                 if (!tablespace_warning)
    3311           0 :                     ereport(WARNING,
    3312             :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    3313             :                              errmsg("cannot move system relations, skipping all")));
    3314           0 :                 tablespace_warning = true;
    3315           0 :                 continue;
    3316             :             }
    3317             :         }
    3318             : 
    3319             :         /* Save the list of relation OIDs in private context */
    3320        1292 :         old = MemoryContextSwitchTo(private_context);
    3321             : 
    3322             :         /*
    3323             :          * We always want to reindex pg_class first if it's selected to be
    3324             :          * reindexed.  This ensures that if there is any corruption in
    3325             :          * pg_class' indexes, they will be fixed before we process any other
    3326             :          * tables.  This is critical because reindexing itself will try to
    3327             :          * update pg_class.
    3328             :          */
    3329        1292 :         if (relid == RelationRelationId)
    3330          16 :             relids = lcons_oid(relid, relids);
    3331             :         else
    3332        1276 :             relids = lappend_oid(relids, relid);
    3333             : 
    3334        1292 :         MemoryContextSwitchTo(old);
    3335             :     }
    3336         122 :     table_endscan(scan);
    3337         122 :     table_close(relationRelation, AccessShareLock);
    3338             : 
    3339             :     /*
    3340             :      * Process each relation listed in a separate transaction.  Note that this
    3341             :      * commits and then starts a new transaction immediately.
    3342             :      */
    3343         122 :     ReindexMultipleInternal(stmt, relids, params);
    3344             : 
    3345         122 :     MemoryContextDelete(private_context);
    3346         122 : }
    3347             : 
    3348             : /*
    3349             :  * Error callback specific to ReindexPartitions().
    3350             :  */
    3351             : static void
    3352          12 : reindex_error_callback(void *arg)
    3353             : {
    3354          12 :     ReindexErrorInfo *errinfo = (ReindexErrorInfo *) arg;
    3355             : 
    3356             :     Assert(RELKIND_HAS_PARTITIONS(errinfo->relkind));
    3357             : 
    3358          12 :     if (errinfo->relkind == RELKIND_PARTITIONED_TABLE)
    3359           6 :         errcontext("while reindexing partitioned table \"%s.%s\"",
    3360             :                    errinfo->relnamespace, errinfo->relname);
    3361           6 :     else if (errinfo->relkind == RELKIND_PARTITIONED_INDEX)
    3362           6 :         errcontext("while reindexing partitioned index \"%s.%s\"",
    3363             :                    errinfo->relnamespace, errinfo->relname);
    3364          12 : }
    3365             : 
    3366             : /*
    3367             :  * ReindexPartitions
    3368             :  *
    3369             :  * Reindex a set of partitions, per the partitioned index or table given
    3370             :  * by the caller.
    3371             :  */
    3372             : static void
    3373         108 : ReindexPartitions(const ReindexStmt *stmt, Oid relid, const ReindexParams *params, bool isTopLevel)
    3374             : {
    3375         108 :     List       *partitions = NIL;
    3376         108 :     char        relkind = get_rel_relkind(relid);
    3377         108 :     char       *relname = get_rel_name(relid);
    3378         108 :     char       *relnamespace = get_namespace_name(get_rel_namespace(relid));
    3379             :     MemoryContext reindex_context;
    3380             :     List       *inhoids;
    3381             :     ListCell   *lc;
    3382             :     ErrorContextCallback errcallback;
    3383             :     ReindexErrorInfo errinfo;
    3384             : 
    3385             :     Assert(RELKIND_HAS_PARTITIONS(relkind));
    3386             : 
    3387             :     /*
    3388             :      * Check if this runs in a transaction block, with an error callback to
    3389             :      * provide more context under which a problem happens.
    3390             :      */
    3391         108 :     errinfo.relname = pstrdup(relname);
    3392         108 :     errinfo.relnamespace = pstrdup(relnamespace);
    3393         108 :     errinfo.relkind = relkind;
    3394         108 :     errcallback.callback = reindex_error_callback;
    3395         108 :     errcallback.arg = &errinfo;
    3396         108 :     errcallback.previous = error_context_stack;
    3397         108 :     error_context_stack = &errcallback;
    3398             : 
    3399         108 :     PreventInTransactionBlock(isTopLevel,
    3400             :                               relkind == RELKIND_PARTITIONED_TABLE ?
    3401             :                               "REINDEX TABLE" : "REINDEX INDEX");
    3402             : 
    3403             :     /* Pop the error context stack */
    3404          96 :     error_context_stack = errcallback.previous;
    3405             : 
    3406             :     /*
    3407             :      * Create special memory context for cross-transaction storage.
    3408             :      *
    3409             :      * Since it is a child of PortalContext, it will go away eventually even
    3410             :      * if we suffer an error so there is no need for special abort cleanup
    3411             :      * logic.
    3412             :      */
    3413          96 :     reindex_context = AllocSetContextCreate(PortalContext, "Reindex",
    3414             :                                             ALLOCSET_DEFAULT_SIZES);
    3415             : 
    3416             :     /* ShareLock is enough to prevent schema modifications */
    3417          96 :     inhoids = find_all_inheritors(relid, ShareLock, NULL);
    3418             : 
    3419             :     /*
    3420             :      * The list of relations to reindex are the physical partitions of the
    3421             :      * tree so discard any partitioned table or index.
    3422             :      */
    3423         374 :     foreach(lc, inhoids)
    3424             :     {
    3425         278 :         Oid         partoid = lfirst_oid(lc);
    3426         278 :         char        partkind = get_rel_relkind(partoid);
    3427             :         MemoryContext old_context;
    3428             : 
    3429             :         /*
    3430             :          * This discards partitioned tables, partitioned indexes and foreign
    3431             :          * tables.
    3432             :          */
    3433         278 :         if (!RELKIND_HAS_STORAGE(partkind))
    3434         160 :             continue;
    3435             : 
    3436             :         Assert(partkind == RELKIND_INDEX ||
    3437             :                partkind == RELKIND_RELATION);
    3438             : 
    3439             :         /* Save partition OID */
    3440         118 :         old_context = MemoryContextSwitchTo(reindex_context);
    3441         118 :         partitions = lappend_oid(partitions, partoid);
    3442         118 :         MemoryContextSwitchTo(old_context);
    3443             :     }
    3444             : 
    3445             :     /*
    3446             :      * Process each partition listed in a separate transaction.  Note that
    3447             :      * this commits and then starts a new transaction immediately.
    3448             :      */
    3449          96 :     ReindexMultipleInternal(stmt, partitions, params);
    3450             : 
    3451             :     /*
    3452             :      * Clean up working storage --- note we must do this after
    3453             :      * StartTransactionCommand, else we might be trying to delete the active
    3454             :      * context!
    3455             :      */
    3456          96 :     MemoryContextDelete(reindex_context);
    3457          96 : }
    3458             : 
    3459             : /*
    3460             :  * ReindexMultipleInternal
    3461             :  *
    3462             :  * Reindex a list of relations, each one being processed in its own
    3463             :  * transaction.  This commits the existing transaction immediately,
    3464             :  * and starts a new transaction when finished.
    3465             :  */
    3466             : static void
    3467         218 : ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids, const ReindexParams *params)
    3468             : {
    3469             :     ListCell   *l;
    3470             : 
    3471         218 :     PopActiveSnapshot();
    3472         218 :     CommitTransactionCommand();
    3473             : 
    3474        1628 :     foreach(l, relids)
    3475             :     {
    3476        1410 :         Oid         relid = lfirst_oid(l);
    3477             :         char        relkind;
    3478             :         char        relpersistence;
    3479             : 
    3480        1410 :         StartTransactionCommand();
    3481             : 
    3482             :         /* functions in indexes may want a snapshot set */
    3483        1410 :         PushActiveSnapshot(GetTransactionSnapshot());
    3484             : 
    3485             :         /* check if the relation still exists */
    3486        1410 :         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
    3487             :         {
    3488           4 :             PopActiveSnapshot();
    3489           4 :             CommitTransactionCommand();
    3490           4 :             continue;
    3491             :         }
    3492             : 
    3493             :         /*
    3494             :          * Check permissions except when moving to database's default if a new
    3495             :          * tablespace is chosen.  Note that this check also happens in
    3496             :          * ExecReindex(), but we do an extra check here as this runs across
    3497             :          * multiple transactions.
    3498             :          */
    3499        1406 :         if (OidIsValid(params->tablespaceOid) &&
    3500          12 :             params->tablespaceOid != MyDatabaseTableSpace)
    3501             :         {
    3502             :             AclResult   aclresult;
    3503             : 
    3504          12 :             aclresult = object_aclcheck(TableSpaceRelationId, params->tablespaceOid,
    3505             :                                         GetUserId(), ACL_CREATE);
    3506          12 :             if (aclresult != ACLCHECK_OK)
    3507           0 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    3508           0 :                                get_tablespace_name(params->tablespaceOid));
    3509             :         }
    3510             : 
    3511        1406 :         relkind = get_rel_relkind(relid);
    3512        1406 :         relpersistence = get_rel_persistence(relid);
    3513             : 
    3514             :         /*
    3515             :          * Partitioned tables and indexes can never be processed directly, and
    3516             :          * a list of their leaves should be built first.
    3517             :          */
    3518             :         Assert(!RELKIND_HAS_PARTITIONS(relkind));
    3519             : 
    3520        1406 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3521             :             relpersistence != RELPERSISTENCE_TEMP)
    3522         128 :         {
    3523         128 :             ReindexParams newparams = *params;
    3524             : 
    3525         128 :             newparams.options |= REINDEXOPT_MISSING_OK;
    3526         128 :             (void) ReindexRelationConcurrently(stmt, relid, &newparams);
    3527         128 :             if (ActiveSnapshotSet())
    3528          26 :                 PopActiveSnapshot();
    3529             :             /* ReindexRelationConcurrently() does the verbose output */
    3530             :         }
    3531        1278 :         else if (relkind == RELKIND_INDEX)
    3532             :         {
    3533          18 :             ReindexParams newparams = *params;
    3534             : 
    3535          18 :             newparams.options |=
    3536             :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3537          18 :             reindex_index(stmt, relid, false, relpersistence, &newparams);
    3538          18 :             PopActiveSnapshot();
    3539             :             /* reindex_index() does the verbose output */
    3540             :         }
    3541             :         else
    3542             :         {
    3543             :             bool        result;
    3544        1260 :             ReindexParams newparams = *params;
    3545             : 
    3546        1260 :             newparams.options |=
    3547             :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3548        1260 :             result = reindex_relation(stmt, relid,
    3549             :                                       REINDEX_REL_PROCESS_TOAST |
    3550             :                                       REINDEX_REL_CHECK_CONSTRAINTS,
    3551             :                                       &newparams);
    3552             : 
    3553        1260 :             if (result && (params->options & REINDEXOPT_VERBOSE) != 0)
    3554           0 :                 ereport(INFO,
    3555             :                         (errmsg("table \"%s.%s\" was reindexed",
    3556             :                                 get_namespace_name(get_rel_namespace(relid)),
    3557             :                                 get_rel_name(relid))));
    3558             : 
    3559        1260 :             PopActiveSnapshot();
    3560             :         }
    3561             : 
    3562        1406 :         CommitTransactionCommand();
    3563             :     }
    3564             : 
    3565         218 :     StartTransactionCommand();
    3566         218 : }
    3567             : 
    3568             : 
    3569             : /*
    3570             :  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
    3571             :  * relation OID
    3572             :  *
    3573             :  * 'relationOid' can either belong to an index, a table or a materialized
    3574             :  * view.  For tables and materialized views, all its indexes will be rebuilt,
    3575             :  * excluding invalid indexes and any indexes used in exclusion constraints,
    3576             :  * but including its associated toast table indexes.  For indexes, the index
    3577             :  * itself will be rebuilt.
    3578             :  *
    3579             :  * The locks taken on parent tables and involved indexes are kept until the
    3580             :  * transaction is committed, at which point a session lock is taken on each
    3581             :  * relation.  Both of these protect against concurrent schema changes.
    3582             :  *
    3583             :  * Returns true if any indexes have been rebuilt (including toast table's
    3584             :  * indexes, when relevant), otherwise returns false.
    3585             :  *
    3586             :  * NOTE: This cannot be used on temporary relations.  A concurrent build would
    3587             :  * cause issues with ON COMMIT actions triggered by the transactions of the
    3588             :  * concurrent build.  Temporary relations are not subject to concurrent
    3589             :  * concerns, so there's no need for the more complicated concurrent build,
    3590             :  * anyway, and a non-concurrent reindex is more efficient.
    3591             :  */
    3592             : static bool
    3593         554 : ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const ReindexParams *params)
    3594             : {
    3595             :     typedef struct ReindexIndexInfo
    3596             :     {
    3597             :         Oid         indexId;
    3598             :         Oid         tableId;
    3599             :         Oid         amId;
    3600             :         bool        safe;       /* for set_indexsafe_procflags */
    3601             :     } ReindexIndexInfo;
    3602         554 :     List       *heapRelationIds = NIL;
    3603         554 :     List       *indexIds = NIL;
    3604         554 :     List       *newIndexIds = NIL;
    3605         554 :     List       *relationLocks = NIL;
    3606         554 :     List       *lockTags = NIL;
    3607             :     ListCell   *lc,
    3608             :                *lc2;
    3609             :     MemoryContext private_context;
    3610             :     MemoryContext oldcontext;
    3611             :     char        relkind;
    3612         554 :     char       *relationName = NULL;
    3613         554 :     char       *relationNamespace = NULL;
    3614             :     PGRUsage    ru0;
    3615         554 :     const int   progress_index[] = {
    3616             :         PROGRESS_CREATEIDX_COMMAND,
    3617             :         PROGRESS_CREATEIDX_PHASE,
    3618             :         PROGRESS_CREATEIDX_INDEX_OID,
    3619             :         PROGRESS_CREATEIDX_ACCESS_METHOD_OID
    3620             :     };
    3621             :     int64       progress_vals[4];
    3622             : 
    3623             :     /*
    3624             :      * Create a memory context that will survive forced transaction commits we
    3625             :      * do below.  Since it is a child of PortalContext, it will go away
    3626             :      * eventually even if we suffer an error; there's no need for special
    3627             :      * abort cleanup logic.
    3628             :      */
    3629         554 :     private_context = AllocSetContextCreate(PortalContext,
    3630             :                                             "ReindexConcurrent",
    3631             :                                             ALLOCSET_SMALL_SIZES);
    3632             : 
    3633         554 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    3634             :     {
    3635             :         /* Save data needed by REINDEX VERBOSE in private context */
    3636           4 :         oldcontext = MemoryContextSwitchTo(private_context);
    3637             : 
    3638           4 :         relationName = get_rel_name(relationOid);
    3639           4 :         relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
    3640             : 
    3641           4 :         pg_rusage_init(&ru0);
    3642             : 
    3643           4 :         MemoryContextSwitchTo(oldcontext);
    3644             :     }
    3645             : 
    3646         554 :     relkind = get_rel_relkind(relationOid);
    3647             : 
    3648             :     /*
    3649             :      * Extract the list of indexes that are going to be rebuilt based on the
    3650             :      * relation Oid given by caller.
    3651             :      */
    3652         554 :     switch (relkind)
    3653             :     {
    3654         332 :         case RELKIND_RELATION:
    3655             :         case RELKIND_MATVIEW:
    3656             :         case RELKIND_TOASTVALUE:
    3657             :             {
    3658             :                 /*
    3659             :                  * In the case of a relation, find all its indexes including
    3660             :                  * toast indexes.
    3661             :                  */
    3662             :                 Relation    heapRelation;
    3663             : 
    3664             :                 /* Save the list of relation OIDs in private context */
    3665         332 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3666             : 
    3667             :                 /* Track this relation for session locks */
    3668         332 :                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
    3669             : 
    3670         332 :                 MemoryContextSwitchTo(oldcontext);
    3671             : 
    3672         332 :                 if (IsCatalogRelationOid(relationOid))
    3673          36 :                     ereport(ERROR,
    3674             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3675             :                              errmsg("cannot reindex system catalogs concurrently")));
    3676             : 
    3677             :                 /* Open relation to get its indexes */
    3678         296 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3679             :                 {
    3680         104 :                     heapRelation = try_table_open(relationOid,
    3681             :                                                   ShareUpdateExclusiveLock);
    3682             :                     /* leave if relation does not exist */
    3683         104 :                     if (!heapRelation)
    3684           0 :                         break;
    3685             :                 }
    3686             :                 else
    3687         192 :                     heapRelation = table_open(relationOid,
    3688             :                                               ShareUpdateExclusiveLock);
    3689             : 
    3690         318 :                 if (OidIsValid(params->tablespaceOid) &&
    3691          22 :                     IsSystemRelation(heapRelation))
    3692           2 :                     ereport(ERROR,
    3693             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3694             :                              errmsg("cannot move system relation \"%s\"",
    3695             :                                     RelationGetRelationName(heapRelation))));
    3696             : 
    3697             :                 /* Add all the valid indexes of relation to list */
    3698         568 :                 foreach(lc, RelationGetIndexList(heapRelation))
    3699             :                 {
    3700         274 :                     Oid         cellOid = lfirst_oid(lc);
    3701         274 :                     Relation    indexRelation = index_open(cellOid,
    3702             :                                                            ShareUpdateExclusiveLock);
    3703             : 
    3704         274 :                     if (!indexRelation->rd_index->indisvalid)
    3705           6 :                         ereport(WARNING,
    3706             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3707             :                                  errmsg("skipping reindex of invalid index \"%s.%s\"",
    3708             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3709             :                                         get_rel_name(cellOid)),
    3710             :                                  errhint("Use DROP INDEX or REINDEX INDEX.")));
    3711         268 :                     else if (indexRelation->rd_index->indisexclusion)
    3712           6 :                         ereport(WARNING,
    3713             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3714             :                                  errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
    3715             :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3716             :                                         get_rel_name(cellOid))));
    3717             :                     else
    3718             :                     {
    3719             :                         ReindexIndexInfo *idx;
    3720             : 
    3721             :                         /* Save the list of relation OIDs in private context */
    3722         262 :                         oldcontext = MemoryContextSwitchTo(private_context);
    3723             : 
    3724         262 :                         idx = palloc_object(ReindexIndexInfo);
    3725         262 :                         idx->indexId = cellOid;
    3726             :                         /* other fields set later */
    3727             : 
    3728         262 :                         indexIds = lappend(indexIds, idx);
    3729             : 
    3730         262 :                         MemoryContextSwitchTo(oldcontext);
    3731             :                     }
    3732             : 
    3733         274 :                     index_close(indexRelation, NoLock);
    3734             :                 }
    3735             : 
    3736             :                 /* Also add the toast indexes */
    3737         294 :                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
    3738             :                 {
    3739          88 :                     Oid         toastOid = heapRelation->rd_rel->reltoastrelid;
    3740          88 :                     Relation    toastRelation = table_open(toastOid,
    3741             :                                                            ShareUpdateExclusiveLock);
    3742             : 
    3743             :                     /* Save the list of relation OIDs in private context */
    3744          88 :                     oldcontext = MemoryContextSwitchTo(private_context);
    3745             : 
    3746             :                     /* Track this relation for session locks */
    3747          88 :                     heapRelationIds = lappend_oid(heapRelationIds, toastOid);
    3748             : 
    3749          88 :                     MemoryContextSwitchTo(oldcontext);
    3750             : 
    3751         176 :                     foreach(lc2, RelationGetIndexList(toastRelation))
    3752             :                     {
    3753          88 :                         Oid         cellOid = lfirst_oid(lc2);
    3754          88 :                         Relation    indexRelation = index_open(cellOid,
    3755             :                                                                ShareUpdateExclusiveLock);
    3756             : 
    3757          88 :                         if (!indexRelation->rd_index->indisvalid)
    3758           0 :                             ereport(WARNING,
    3759             :                                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3760             :                                      errmsg("skipping reindex of invalid index \"%s.%s\"",
    3761             :                                             get_namespace_name(get_rel_namespace(cellOid)),
    3762             :                                             get_rel_name(cellOid)),
    3763             :                                      errhint("Use DROP INDEX or REINDEX INDEX.")));
    3764             :                         else
    3765             :                         {
    3766             :                             ReindexIndexInfo *idx;
    3767             : 
    3768             :                             /*
    3769             :                              * Save the list of relation OIDs in private
    3770             :                              * context
    3771             :                              */
    3772          88 :                             oldcontext = MemoryContextSwitchTo(private_context);
    3773             : 
    3774          88 :                             idx = palloc_object(ReindexIndexInfo);
    3775          88 :                             idx->indexId = cellOid;
    3776          88 :                             indexIds = lappend(indexIds, idx);
    3777             :                             /* other fields set later */
    3778             : 
    3779          88 :                             MemoryContextSwitchTo(oldcontext);
    3780             :                         }
    3781             : 
    3782          88 :                         index_close(indexRelation, NoLock);
    3783             :                     }
    3784             : 
    3785          88 :                     table_close(toastRelation, NoLock);
    3786             :                 }
    3787             : 
    3788         294 :                 table_close(heapRelation, NoLock);
    3789         294 :                 break;
    3790             :             }
    3791         222 :         case RELKIND_INDEX:
    3792             :             {
    3793         222 :                 Oid         heapId = IndexGetRelation(relationOid,
    3794         222 :                                                       (params->options & REINDEXOPT_MISSING_OK) != 0);
    3795             :                 Relation    heapRelation;
    3796             :                 ReindexIndexInfo *idx;
    3797             : 
    3798             :                 /* if relation is missing, leave */
    3799         222 :                 if (!OidIsValid(heapId))
    3800           0 :                     break;
    3801             : 
    3802         222 :                 if (IsCatalogRelationOid(heapId))
    3803          18 :                     ereport(ERROR,
    3804             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3805             :                              errmsg("cannot reindex system catalogs concurrently")));
    3806             : 
    3807             :                 /*
    3808             :                  * Don't allow reindex for an invalid index on TOAST table, as
    3809             :                  * if rebuilt it would not be possible to drop it.  Match
    3810             :                  * error message in reindex_index().
    3811             :                  */
    3812         204 :                 if (IsToastNamespace(get_rel_namespace(relationOid)) &&
    3813          56 :                     !get_index_isvalid(relationOid))
    3814           0 :                     ereport(ERROR,
    3815             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3816             :                              errmsg("cannot reindex invalid index on TOAST table")));
    3817             : 
    3818             :                 /*
    3819             :                  * Check if parent relation can be locked and if it exists,
    3820             :                  * this needs to be done at this stage as the list of indexes
    3821             :                  * to rebuild is not complete yet, and REINDEXOPT_MISSING_OK
    3822             :                  * should not be used once all the session locks are taken.
    3823             :                  */
    3824         204 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3825             :                 {
    3826          24 :                     heapRelation = try_table_open(heapId,
    3827             :                                                   ShareUpdateExclusiveLock);
    3828             :                     /* leave if relation does not exist */
    3829          24 :                     if (!heapRelation)
    3830           0 :                         break;
    3831             :                 }
    3832             :                 else
    3833         180 :                     heapRelation = table_open(heapId,
    3834             :                                               ShareUpdateExclusiveLock);
    3835             : 
    3836         212 :                 if (OidIsValid(params->tablespaceOid) &&
    3837           8 :                     IsSystemRelation(heapRelation))
    3838           2 :                     ereport(ERROR,
    3839             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3840             :                              errmsg("cannot move system relation \"%s\"",
    3841             :                                     get_rel_name(relationOid))));
    3842             : 
    3843         202 :                 table_close(heapRelation, NoLock);
    3844             : 
    3845             :                 /* Save the list of relation OIDs in private context */
    3846         202 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3847             : 
    3848             :                 /* Track the heap relation of this index for session locks */
    3849         202 :                 heapRelationIds = list_make1_oid(heapId);
    3850             : 
    3851             :                 /*
    3852             :                  * Save the list of relation OIDs in private context.  Note
    3853             :                  * that invalid indexes are allowed here.
    3854             :                  */
    3855         202 :                 idx = palloc_object(ReindexIndexInfo);
    3856         202 :                 idx->indexId = relationOid;
    3857         202 :                 indexIds = lappend(indexIds, idx);
    3858             :                 /* other fields set later */
    3859             : 
    3860         202 :                 MemoryContextSwitchTo(oldcontext);
    3861         202 :                 break;
    3862             :             }
    3863             : 
    3864           0 :         case RELKIND_PARTITIONED_TABLE:
    3865             :         case RELKIND_PARTITIONED_INDEX:
    3866             :         default:
    3867             :             /* Return error if type of relation is not supported */
    3868           0 :             ereport(ERROR,
    3869             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3870             :                      errmsg("cannot reindex this type of relation concurrently")));
    3871             :             break;
    3872             :     }
    3873             : 
    3874             :     /*
    3875             :      * Definitely no indexes, so leave.  Any checks based on
    3876             :      * REINDEXOPT_MISSING_OK should be done only while the list of indexes to
    3877             :      * work on is built as the session locks taken before this transaction
    3878             :      * commits will make sure that they cannot be dropped by a concurrent
    3879             :      * session until this operation completes.
    3880             :      */
    3881         496 :     if (indexIds == NIL)
    3882          44 :         return false;
    3883             : 
    3884             :     /* It's not a shared catalog, so refuse to move it to shared tablespace */
    3885         452 :     if (params->tablespaceOid == GLOBALTABLESPACE_OID)
    3886           6 :         ereport(ERROR,
    3887             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3888             :                  errmsg("cannot move non-shared relation to tablespace \"%s\"",
    3889             :                         get_tablespace_name(params->tablespaceOid))));
    3890             : 
    3891             :     Assert(heapRelationIds != NIL);
    3892             : 
    3893             :     /*-----
    3894             :      * Now we have all the indexes we want to process in indexIds.
    3895             :      *
    3896             :      * The phases now are:
    3897             :      *
    3898             :      * 1. create new indexes in the catalog
    3899             :      * 2. build new indexes
    3900             :      * 3. let new indexes catch up with tuples inserted in the meantime
    3901             :      * 4. swap index names
    3902             :      * 5. mark old indexes as dead
    3903             :      * 6. drop old indexes
    3904             :      *
    3905             :      * We process each phase for all indexes before moving to the next phase,
    3906             :      * for efficiency.
    3907             :      */
    3908             : 
    3909             :     /*
    3910             :      * Phase 1 of REINDEX CONCURRENTLY
    3911             :      *
    3912             :      * Create a new index with the same properties as the old one, but it is
    3913             :      * only registered in catalogs and will be built later.  Then get session
    3914             :      * locks on all involved tables.  See analogous code in DefineIndex() for
    3915             :      * more detailed comments.
    3916             :      */
    3917             : 
    3918         986 :     foreach(lc, indexIds)
    3919             :     {
    3920             :         char       *concurrentName;
    3921         546 :         ReindexIndexInfo *idx = lfirst(lc);
    3922             :         ReindexIndexInfo *newidx;
    3923             :         Oid         newIndexId;
    3924             :         Relation    indexRel;
    3925             :         Relation    heapRel;
    3926             :         Oid         save_userid;
    3927             :         int         save_sec_context;
    3928             :         int         save_nestlevel;
    3929             :         Relation    newIndexRel;
    3930             :         LockRelId  *lockrelid;
    3931             :         Oid         tablespaceid;
    3932             : 
    3933         546 :         indexRel = index_open(idx->indexId, ShareUpdateExclusiveLock);
    3934         546 :         heapRel = table_open(indexRel->rd_index->indrelid,
    3935             :                              ShareUpdateExclusiveLock);
    3936             : 
    3937             :         /*
    3938             :          * Switch to the table owner's userid, so that any index functions are
    3939             :          * run as that user.  Also lock down security-restricted operations
    3940             :          * and arrange to make GUC variable changes local to this command.
    3941             :          */
    3942         546 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    3943         546 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
    3944             :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
    3945         546 :         save_nestlevel = NewGUCNestLevel();
    3946         546 :         RestrictSearchPath();
    3947             : 
    3948             :         /* determine safety of this index for set_indexsafe_procflags */
    3949        1052 :         idx->safe = (RelationGetIndexExpressions(indexRel) == NIL &&
    3950         506 :                      RelationGetIndexPredicate(indexRel) == NIL);
    3951             : 
    3952             : #ifdef USE_INJECTION_POINTS
    3953         546 :         if (idx->safe)
    3954         498 :             INJECTION_POINT("reindex-conc-index-safe", NULL);
    3955             :         else
    3956          48 :             INJECTION_POINT("reindex-conc-index-not-safe", NULL);
    3957             : #endif
    3958             : 
    3959         546 :         idx->tableId = RelationGetRelid(heapRel);
    3960         546 :         idx->amId = indexRel->rd_rel->relam;
    3961             : 
    3962             :         /* This function shouldn't be called for temporary relations. */
    3963         546 :         if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
    3964           0 :             elog(ERROR, "cannot reindex a temporary table concurrently");
    3965             : 
    3966         546 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, idx->tableId);
    3967             : 
    3968         546 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3969         546 :         progress_vals[1] = 0;   /* initializing */
    3970         546 :         progress_vals[2] = idx->indexId;
    3971         546 :         progress_vals[3] = idx->amId;
    3972         546 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    3973             : 
    3974             :         /* Choose a temporary relation name for the new index */
    3975         546 :         concurrentName = ChooseRelationName(get_rel_name(idx->indexId),
    3976             :                                             NULL,
    3977             :                                             "ccnew",
    3978         546 :                                             get_rel_namespace(indexRel->rd_index->indrelid),
    3979             :                                             false);
    3980             : 
    3981             :         /* Choose the new tablespace, indexes of toast tables are not moved */
    3982         546 :         if (OidIsValid(params->tablespaceOid) &&
    3983          28 :             heapRel->rd_rel->relkind != RELKIND_TOASTVALUE)
    3984          20 :             tablespaceid = params->tablespaceOid;
    3985             :         else
    3986         526 :             tablespaceid = indexRel->rd_rel->reltablespace;
    3987             : 
    3988             :         /* Create new index definition based on given index */
    3989         546 :         newIndexId = index_concurrently_create_copy(heapRel,
    3990             :                                                     idx->indexId,
    3991             :                                                     tablespaceid,
    3992             :                                                     concurrentName);
    3993             : 
    3994             :         /*
    3995             :          * Now open the relation of the new index, a session-level lock is
    3996             :          * also needed on it.
    3997             :          */
    3998         540 :         newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
    3999             : 
    4000             :         /*
    4001             :          * Save the list of OIDs and locks in private context
    4002             :          */
    4003         540 :         oldcontext = MemoryContextSwitchTo(private_context);
    4004             : 
    4005         540 :         newidx = palloc_object(ReindexIndexInfo);
    4006         540 :         newidx->indexId = newIndexId;
    4007         540 :         newidx->safe = idx->safe;
    4008         540 :         newidx->tableId = idx->tableId;
    4009         540 :         newidx->amId = idx->amId;
    4010             : 
    4011         540 :         newIndexIds = lappend(newIndexIds, newidx);
    4012             : 
    4013             :         /*
    4014             :          * Save lockrelid to protect each relation from drop then close
    4015             :          * relations. The lockrelid on parent relation is not taken here to
    4016             :          * avoid multiple locks taken on the same relation, instead we rely on
    4017             :          * parentRelationIds built earlier.
    4018             :          */
    4019         540 :         lockrelid = palloc_object(LockRelId);
    4020         540 :         *lockrelid = indexRel->rd_lockInfo.lockRelId;
    4021         540 :         relationLocks = lappend(relationLocks, lockrelid);
    4022         540 :         lockrelid = palloc_object(LockRelId);
    4023         540 :         *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
    4024         540 :         relationLocks = lappend(relationLocks, lockrelid);
    4025             : 
    4026         540 :         MemoryContextSwitchTo(oldcontext);
    4027             : 
    4028         540 :         index_close(indexRel, NoLock);
    4029         540 :         index_close(newIndexRel, NoLock);
    4030             : 
    4031             :         /* Roll back any GUC changes executed by index functions */
    4032         540 :         AtEOXact_GUC(false, save_nestlevel);
    4033             : 
    4034             :         /* Restore userid and security context */
    4035         540 :         SetUserIdAndSecContext(save_userid, save_sec_context);
    4036             : 
    4037         540 :         table_close(heapRel, NoLock);
    4038             : 
    4039             :         /*
    4040             :          * If a statement is available, telling that this comes from a REINDEX
    4041             :          * command, collect the new index for event triggers.
    4042             :          */
    4043         540 :         if (stmt)
    4044             :         {
    4045             :             ObjectAddress address;
    4046             : 
    4047         540 :             ObjectAddressSet(address, RelationRelationId, newIndexId);
    4048         540 :             EventTriggerCollectSimpleCommand(address,
    4049             :                                              InvalidObjectAddress,
    4050             :                                              (Node *) stmt);
    4051             :         }
    4052             :     }
    4053             : 
    4054             :     /*
    4055             :      * Save the heap lock for following visibility checks with other backends
    4056             :      * might conflict with this session.
    4057             :      */
    4058         968 :     foreach(lc, heapRelationIds)
    4059             :     {
    4060         528 :         Relation    heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
    4061             :         LockRelId  *lockrelid;
    4062             :         LOCKTAG    *heaplocktag;
    4063             : 
    4064             :         /* Save the list of locks in private context */
    4065         528 :         oldcontext = MemoryContextSwitchTo(private_context);
    4066             : 
    4067             :         /* Add lockrelid of heap relation to the list of locked relations */
    4068         528 :         lockrelid = palloc_object(LockRelId);
    4069         528 :         *lockrelid = heapRelation->rd_lockInfo.lockRelId;
    4070         528 :         relationLocks = lappend(relationLocks, lockrelid);
    4071             : 
    4072         528 :         heaplocktag = palloc_object(LOCKTAG);
    4073             : 
    4074             :         /* Save the LOCKTAG for this parent relation for the wait phase */
    4075         528 :         SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
    4076         528 :         lockTags = lappend(lockTags, heaplocktag);
    4077             : 
    4078         528 :         MemoryContextSwitchTo(oldcontext);
    4079             : 
    4080             :         /* Close heap relation */
    4081         528 :         table_close(heapRelation, NoLock);
    4082             :     }
    4083             : 
    4084             :     /* Get a session-level lock on each table. */
    4085        2048 :     foreach(lc, relationLocks)
    4086             :     {
    4087        1608 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4088             : 
    4089        1608 :         LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4090             :     }
    4091             : 
    4092         440 :     PopActiveSnapshot();
    4093         440 :     CommitTransactionCommand();
    4094         440 :     StartTransactionCommand();
    4095             : 
    4096             :     /*
    4097             :      * Because we don't take a snapshot in this transaction, there's no need
    4098             :      * to set the PROC_IN_SAFE_IC flag here.
    4099             :      */
    4100             : 
    4101             :     /*
    4102             :      * Phase 2 of REINDEX CONCURRENTLY
    4103             :      *
    4104             :      * Build the new indexes in a separate transaction for each index to avoid
    4105             :      * having open transactions for an unnecessary long time.  But before
    4106             :      * doing that, wait until no running transactions could have the table of
    4107             :      * the index open with the old list of indexes.  See "phase 2" in
    4108             :      * DefineIndex() for more details.
    4109             :      */
    4110             : 
    4111         440 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4112             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    4113         440 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4114         440 :     CommitTransactionCommand();
    4115             : 
    4116         974 :     foreach(lc, newIndexIds)
    4117             :     {
    4118         540 :         ReindexIndexInfo *newidx = lfirst(lc);
    4119             : 
    4120             :         /* Start new transaction for this index's concurrent build */
    4121         540 :         StartTransactionCommand();
    4122             : 
    4123             :         /*
    4124             :          * Check for user-requested abort.  This is inside a transaction so as
    4125             :          * xact.c does not issue a useless WARNING, and ensures that
    4126             :          * session-level locks are cleaned up on abort.
    4127             :          */
    4128         540 :         CHECK_FOR_INTERRUPTS();
    4129             : 
    4130             :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4131         540 :         if (newidx->safe)
    4132         492 :             set_indexsafe_procflags();
    4133             : 
    4134             :         /* Set ActiveSnapshot since functions in the indexes may need it */
    4135         540 :         PushActiveSnapshot(GetTransactionSnapshot());
    4136             : 
    4137             :         /*
    4138             :          * Update progress for the index to build, with the correct parent
    4139             :          * table involved.
    4140             :          */
    4141         540 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4142         540 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4143         540 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
    4144         540 :         progress_vals[2] = newidx->indexId;
    4145         540 :         progress_vals[3] = newidx->amId;
    4146         540 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4147             : 
    4148             :         /* Perform concurrent build of new index */
    4149         540 :         index_concurrently_build(newidx->tableId, newidx->indexId);
    4150             : 
    4151         534 :         PopActiveSnapshot();
    4152         534 :         CommitTransactionCommand();
    4153             :     }
    4154             : 
    4155         434 :     StartTransactionCommand();
    4156             : 
    4157             :     /*
    4158             :      * Because we don't take a snapshot or Xid in this transaction, there's no
    4159             :      * need to set the PROC_IN_SAFE_IC flag here.
    4160             :      */
    4161             : 
    4162             :     /*
    4163             :      * Phase 3 of REINDEX CONCURRENTLY
    4164             :      *
    4165             :      * During this phase the old indexes catch up with any new tuples that
    4166             :      * were created during the previous phase.  See "phase 3" in DefineIndex()
    4167             :      * for more details.
    4168             :      */
    4169             : 
    4170         434 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4171             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    4172         434 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4173         434 :     CommitTransactionCommand();
    4174             : 
    4175         968 :     foreach(lc, newIndexIds)
    4176             :     {
    4177         534 :         ReindexIndexInfo *newidx = lfirst(lc);
    4178             :         TransactionId limitXmin;
    4179             :         Snapshot    snapshot;
    4180             : 
    4181         534 :         StartTransactionCommand();
    4182             : 
    4183             :         /*
    4184             :          * Check for user-requested abort.  This is inside a transaction so as
    4185             :          * xact.c does not issue a useless WARNING, and ensures that
    4186             :          * session-level locks are cleaned up on abort.
    4187             :          */
    4188         534 :         CHECK_FOR_INTERRUPTS();
    4189             : 
    4190             :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4191         534 :         if (newidx->safe)
    4192         486 :             set_indexsafe_procflags();
    4193             : 
    4194             :         /*
    4195             :          * Take the "reference snapshot" that will be used by validate_index()
    4196             :          * to filter candidate tuples.
    4197             :          */
    4198         534 :         snapshot = RegisterSnapshot(GetTransactionSnapshot());
    4199         534 :         PushActiveSnapshot(snapshot);
    4200             : 
    4201             :         /*
    4202             :          * Update progress for the index to build, with the correct parent
    4203             :          * table involved.
    4204             :          */
    4205         534 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4206         534 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4207         534 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
    4208         534 :         progress_vals[2] = newidx->indexId;
    4209         534 :         progress_vals[3] = newidx->amId;
    4210         534 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4211             : 
    4212         534 :         validate_index(newidx->tableId, newidx->indexId, snapshot);
    4213             : 
    4214             :         /*
    4215             :          * We can now do away with our active snapshot, we still need to save
    4216             :          * the xmin limit to wait for older snapshots.
    4217             :          */
    4218         534 :         limitXmin = snapshot->xmin;
    4219             : 
    4220         534 :         PopActiveSnapshot();
    4221         534 :         UnregisterSnapshot(snapshot);
    4222             : 
    4223             :         /*
    4224             :          * To ensure no deadlocks, we must commit and start yet another
    4225             :          * transaction, and do our wait before any snapshot has been taken in
    4226             :          * it.
    4227             :          */
    4228         534 :         CommitTransactionCommand();
    4229         534 :         StartTransactionCommand();
    4230             : 
    4231             :         /*
    4232             :          * The index is now valid in the sense that it contains all currently
    4233             :          * interesting tuples.  But since it might not contain tuples deleted
    4234             :          * just before the reference snap was taken, we have to wait out any
    4235             :          * transactions that might have older snapshots.
    4236             :          *
    4237             :          * Because we don't take a snapshot or Xid in this transaction,
    4238             :          * there's no need to set the PROC_IN_SAFE_IC flag here.
    4239             :          */
    4240         534 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4241             :                                      PROGRESS_CREATEIDX_PHASE_WAIT_3);
    4242         534 :         WaitForOlderSnapshots(limitXmin, true);
    4243             : 
    4244         534 :         CommitTransactionCommand();
    4245             :     }
    4246             : 
    4247             :     /*
    4248             :      * Phase 4 of REINDEX CONCURRENTLY
    4249             :      *
    4250             :      * Now that the new indexes have been validated, swap each new index with
    4251             :      * its corresponding old index.
    4252             :      *
    4253             :      * We mark the new indexes as valid and the old indexes as not valid at
    4254             :      * the same time to make sure we only get constraint violations from the
    4255             :      * indexes with the correct names.
    4256             :      */
    4257             : 
    4258         434 :     INJECTION_POINT("reindex-relation-concurrently-before-swap", NULL);
    4259         434 :     StartTransactionCommand();
    4260             : 
    4261             :     /*
    4262             :      * Because this transaction only does catalog manipulations and doesn't do
    4263             :      * any index operations, we can set the PROC_IN_SAFE_IC flag here
    4264             :      * unconditionally.
    4265             :      */
    4266         434 :     set_indexsafe_procflags();
    4267             : 
    4268         968 :     forboth(lc, indexIds, lc2, newIndexIds)
    4269             :     {
    4270         534 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4271         534 :         ReindexIndexInfo *newidx = lfirst(lc2);
    4272             :         char       *oldName;
    4273             : 
    4274             :         /*
    4275             :          * Check for user-requested abort.  This is inside a transaction so as
    4276             :          * xact.c does not issue a useless WARNING, and ensures that
    4277             :          * session-level locks are cleaned up on abort.
    4278             :          */
    4279         534 :         CHECK_FOR_INTERRUPTS();
    4280             : 
    4281             :         /* Choose a relation name for old index */
    4282         534 :         oldName = ChooseRelationName(get_rel_name(oldidx->indexId),
    4283             :                                      NULL,
    4284             :                                      "ccold",
    4285             :                                      get_rel_namespace(oldidx->tableId),
    4286             :                                      false);
    4287             : 
    4288             :         /*
    4289             :          * Swapping the indexes might involve TOAST table access, so ensure we
    4290             :          * have a valid snapshot.
    4291             :          */
    4292         534 :         PushActiveSnapshot(GetTransactionSnapshot());
    4293             : 
    4294             :         /*
    4295             :          * Swap old index with the new one.  This also marks the new one as
    4296             :          * valid and the old one as not valid.
    4297             :          */
    4298         534 :         index_concurrently_swap(newidx->indexId, oldidx->indexId, oldName);
    4299             : 
    4300         534 :         PopActiveSnapshot();
    4301             : 
    4302             :         /*
    4303             :          * Invalidate the relcache for the table, so that after this commit
    4304             :          * all sessions will refresh any cached plans that might reference the
    4305             :          * index.
    4306             :          */
    4307         534 :         CacheInvalidateRelcacheByRelid(oldidx->tableId);
    4308             : 
    4309             :         /*
    4310             :          * CCI here so that subsequent iterations see the oldName in the
    4311             :          * catalog and can choose a nonconflicting name for their oldName.
    4312             :          * Otherwise, this could lead to conflicts if a table has two indexes
    4313             :          * whose names are equal for the first NAMEDATALEN-minus-a-few
    4314             :          * characters.
    4315             :          */
    4316         534 :         CommandCounterIncrement();
    4317             :     }
    4318             : 
    4319             :     /* Commit this transaction and make index swaps visible */
    4320         434 :     CommitTransactionCommand();
    4321         434 :     StartTransactionCommand();
    4322             : 
    4323             :     /*
    4324             :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4325             :      * real need for that, because we only acquire an Xid after the wait is
    4326             :      * done, and that lasts for a very short period.
    4327             :      */
    4328             : 
    4329             :     /*
    4330             :      * Phase 5 of REINDEX CONCURRENTLY
    4331             :      *
    4332             :      * Mark the old indexes as dead.  First we must wait until no running
    4333             :      * transaction could be using the index for a query.  See also
    4334             :      * index_drop() for more details.
    4335             :      */
    4336             : 
    4337         434 :     INJECTION_POINT("reindex-relation-concurrently-before-set-dead", NULL);
    4338         434 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4339             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    4340         434 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4341             : 
    4342         968 :     foreach(lc, indexIds)
    4343             :     {
    4344         534 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4345             : 
    4346             :         /*
    4347             :          * Check for user-requested abort.  This is inside a transaction so as
    4348             :          * xact.c does not issue a useless WARNING, and ensures that
    4349             :          * session-level locks are cleaned up on abort.
    4350             :          */
    4351         534 :         CHECK_FOR_INTERRUPTS();
    4352             : 
    4353             :         /*
    4354             :          * Updating pg_index might involve TOAST table access, so ensure we
    4355             :          * have a valid snapshot.
    4356             :          */
    4357         534 :         PushActiveSnapshot(GetTransactionSnapshot());
    4358             : 
    4359         534 :         index_concurrently_set_dead(oldidx->tableId, oldidx->indexId);
    4360             : 
    4361         534 :         PopActiveSnapshot();
    4362             :     }
    4363             : 
    4364             :     /* Commit this transaction to make the updates visible. */
    4365         434 :     CommitTransactionCommand();
    4366         434 :     StartTransactionCommand();
    4367             : 
    4368             :     /*
    4369             :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4370             :      * real need for that, because we only acquire an Xid after the wait is
    4371             :      * done, and that lasts for a very short period.
    4372             :      */
    4373             : 
    4374             :     /*
    4375             :      * Phase 6 of REINDEX CONCURRENTLY
    4376             :      *
    4377             :      * Drop the old indexes.
    4378             :      */
    4379             : 
    4380         434 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4381             :                                  PROGRESS_CREATEIDX_PHASE_WAIT_5);
    4382         434 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4383             : 
    4384         434 :     PushActiveSnapshot(GetTransactionSnapshot());
    4385             : 
    4386             :     {
    4387         434 :         ObjectAddresses *objects = new_object_addresses();
    4388             : 
    4389         968 :         foreach(lc, indexIds)
    4390             :         {
    4391         534 :             ReindexIndexInfo *idx = lfirst(lc);
    4392             :             ObjectAddress object;
    4393             : 
    4394         534 :             object.classId = RelationRelationId;
    4395         534 :             object.objectId = idx->indexId;
    4396         534 :             object.objectSubId = 0;
    4397             : 
    4398         534 :             add_exact_object_address(&object, objects);
    4399             :         }
    4400             : 
    4401             :         /*
    4402             :          * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
    4403             :          * right lock level.
    4404             :          */
    4405         434 :         performMultipleDeletions(objects, DROP_RESTRICT,
    4406             :                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
    4407             :     }
    4408             : 
    4409         434 :     PopActiveSnapshot();
    4410         434 :     CommitTransactionCommand();
    4411             : 
    4412             :     /*
    4413             :      * Finally, release the session-level lock on the table.
    4414             :      */
    4415        2024 :     foreach(lc, relationLocks)
    4416             :     {
    4417        1590 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4418             : 
    4419        1590 :         UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4420             :     }
    4421             : 
    4422             :     /* Start a new transaction to finish process properly */
    4423         434 :     StartTransactionCommand();
    4424             : 
    4425             :     /* Log what we did */
    4426         434 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    4427             :     {
    4428           4 :         if (relkind == RELKIND_INDEX)
    4429           0 :             ereport(INFO,
    4430             :                     (errmsg("index \"%s.%s\" was reindexed",
    4431             :                             relationNamespace, relationName),
    4432             :                      errdetail("%s.",
    4433             :                                pg_rusage_show(&ru0))));
    4434             :         else
    4435             :         {
    4436          12 :             foreach(lc, newIndexIds)
    4437             :             {
    4438           8 :                 ReindexIndexInfo *idx = lfirst(lc);
    4439           8 :                 Oid         indOid = idx->indexId;
    4440             : 
    4441           8 :                 ereport(INFO,
    4442             :                         (errmsg("index \"%s.%s\" was reindexed",
    4443             :                                 get_namespace_name(get_rel_namespace(indOid)),
    4444             :                                 get_rel_name(indOid))));
    4445             :                 /* Don't show rusage here, since it's not per index. */
    4446             :             }
    4447             : 
    4448           4 :             ereport(INFO,
    4449             :                     (errmsg("table \"%s.%s\" was reindexed",
    4450             :                             relationNamespace, relationName),
    4451             :                      errdetail("%s.",
    4452             :                                pg_rusage_show(&ru0))));
    4453             :         }
    4454             :     }
    4455             : 
    4456         434 :     MemoryContextDelete(private_context);
    4457             : 
    4458         434 :     pgstat_progress_end_command();
    4459             : 
    4460         434 :     return true;
    4461             : }
    4462             : 
    4463             : /*
    4464             :  * Insert or delete an appropriate pg_inherits tuple to make the given index
    4465             :  * be a partition of the indicated parent index.
    4466             :  *
    4467             :  * This also corrects the pg_depend information for the affected index.
    4468             :  */
    4469             : void
    4470        1032 : IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
    4471             : {
    4472             :     Relation    pg_inherits;
    4473             :     ScanKeyData key[2];
    4474             :     SysScanDesc scan;
    4475        1032 :     Oid         partRelid = RelationGetRelid(partitionIdx);
    4476             :     HeapTuple   tuple;
    4477             :     bool        fix_dependencies;
    4478             : 
    4479             :     /* Make sure this is an index */
    4480             :     Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
    4481             :            partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
    4482             : 
    4483             :     /*
    4484             :      * Scan pg_inherits for rows linking our index to some parent.
    4485             :      */
    4486        1032 :     pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
    4487        1032 :     ScanKeyInit(&key[0],
    4488             :                 Anum_pg_inherits_inhrelid,
    4489             :                 BTEqualStrategyNumber, F_OIDEQ,
    4490             :                 ObjectIdGetDatum(partRelid));
    4491        1032 :     ScanKeyInit(&key[1],
    4492             :                 Anum_pg_inherits_inhseqno,
    4493             :                 BTEqualStrategyNumber, F_INT4EQ,
    4494             :                 Int32GetDatum(1));
    4495        1032 :     scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
    4496             :                               NULL, 2, key);
    4497        1032 :     tuple = systable_getnext(scan);
    4498             : 
    4499        1032 :     if (!HeapTupleIsValid(tuple))
    4500             :     {
    4501         604 :         if (parentOid == InvalidOid)
    4502             :         {
    4503             :             /*
    4504             :              * No pg_inherits row, and no parent wanted: nothing to do in this
    4505             :              * case.
    4506             :              */
    4507           0 :             fix_dependencies = false;
    4508             :         }
    4509             :         else
    4510             :         {
    4511         604 :             StoreSingleInheritance(partRelid, parentOid, 1);
    4512         604 :             fix_dependencies = true;
    4513             :         }
    4514             :     }
    4515             :     else
    4516             :     {
    4517         428 :         Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
    4518             : 
    4519         428 :         if (parentOid == InvalidOid)
    4520             :         {
    4521             :             /*
    4522             :              * There exists a pg_inherits row, which we want to clear; do so.
    4523             :              */
    4524         428 :             CatalogTupleDelete(pg_inherits, &tuple->t_self);
    4525         428 :             fix_dependencies = true;
    4526             :         }
    4527             :         else
    4528             :         {
    4529             :             /*
    4530             :              * A pg_inherits row exists.  If it's the same we want, then we're
    4531             :              * good; if it differs, that amounts to a corrupt catalog and
    4532             :              * should not happen.
    4533             :              */
    4534           0 :             if (inhForm->inhparent != parentOid)
    4535             :             {
    4536             :                 /* unexpected: we should not get called in this case */
    4537           0 :                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
    4538             :                      inhForm->inhrelid, inhForm->inhparent);
    4539             :             }
    4540             : 
    4541             :             /* already in the right state */
    4542           0 :             fix_dependencies = false;
    4543             :         }
    4544             :     }
    4545             : 
    4546             :     /* done with pg_inherits */
    4547        1032 :     systable_endscan(scan);
    4548        1032 :     relation_close(pg_inherits, RowExclusiveLock);
    4549             : 
    4550             :     /* set relhassubclass if an index partition has been added to the parent */
    4551        1032 :     if (OidIsValid(parentOid))
    4552             :     {
    4553         604 :         LockRelationOid(parentOid, ShareUpdateExclusiveLock);
    4554         604 :         SetRelationHasSubclass(parentOid, true);
    4555             :     }
    4556             : 
    4557             :     /* set relispartition correctly on the partition */
    4558        1032 :     update_relispartition(partRelid, OidIsValid(parentOid));
    4559             : 
    4560        1032 :     if (fix_dependencies)
    4561             :     {
    4562             :         /*
    4563             :          * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
    4564             :          * dependencies on the parent index and the table; if removing a
    4565             :          * parent, delete PARTITION dependencies.
    4566             :          */
    4567        1032 :         if (OidIsValid(parentOid))
    4568             :         {
    4569             :             ObjectAddress partIdx;
    4570             :             ObjectAddress parentIdx;
    4571             :             ObjectAddress partitionTbl;
    4572             : 
    4573         604 :             ObjectAddressSet(partIdx, RelationRelationId, partRelid);
    4574         604 :             ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
    4575         604 :             ObjectAddressSet(partitionTbl, RelationRelationId,
    4576             :                              partitionIdx->rd_index->indrelid);
    4577         604 :             recordDependencyOn(&partIdx, &parentIdx,
    4578             :                                DEPENDENCY_PARTITION_PRI);
    4579         604 :             recordDependencyOn(&partIdx, &partitionTbl,
    4580             :                                DEPENDENCY_PARTITION_SEC);
    4581             :         }
    4582             :         else
    4583             :         {
    4584         428 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4585             :                                             RelationRelationId,
    4586             :                                             DEPENDENCY_PARTITION_PRI);
    4587         428 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4588             :                                             RelationRelationId,
    4589             :                                             DEPENDENCY_PARTITION_SEC);
    4590             :         }
    4591             : 
    4592             :         /* make our updates visible */
    4593        1032 :         CommandCounterIncrement();
    4594             :     }
    4595        1032 : }
    4596             : 
    4597             : /*
    4598             :  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
    4599             :  * given index to the given value.
    4600             :  */
    4601             : static void
    4602        1032 : update_relispartition(Oid relationId, bool newval)
    4603             : {
    4604             :     HeapTuple   tup;
    4605             :     Relation    classRel;
    4606             :     ItemPointerData otid;
    4607             : 
    4608        1032 :     classRel = table_open(RelationRelationId, RowExclusiveLock);
    4609        1032 :     tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
    4610        1032 :     if (!HeapTupleIsValid(tup))
    4611           0 :         elog(ERROR, "cache lookup failed for relation %u", relationId);
    4612        1032 :     otid = tup->t_self;
    4613             :     Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    4614        1032 :     ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
    4615        1032 :     CatalogTupleUpdate(classRel, &otid, tup);
    4616        1032 :     UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
    4617        1032 :     heap_freetuple(tup);
    4618        1032 :     table_close(classRel, RowExclusiveLock);
    4619        1032 : }
    4620             : 
    4621             : /*
    4622             :  * Set the PROC_IN_SAFE_IC flag in MyProc->statusFlags.
    4623             :  *
    4624             :  * When doing concurrent index builds, we can set this flag
    4625             :  * to tell other processes concurrently running CREATE
    4626             :  * INDEX CONCURRENTLY or REINDEX CONCURRENTLY to ignore us when
    4627             :  * doing their waits for concurrent snapshots.  On one hand it
    4628             :  * avoids pointlessly waiting for a process that's not interesting
    4629             :  * anyway; but more importantly it avoids deadlocks in some cases.
    4630             :  *
    4631             :  * This can be done safely only for indexes that don't execute any
    4632             :  * expressions that could access other tables, so index must not be
    4633             :  * expressional nor partial.  Caller is responsible for only calling
    4634             :  * this routine when that assumption holds true.
    4635             :  *
    4636             :  * (The flag is reset automatically at transaction end, so it must be
    4637             :  * set for each transaction.)
    4638             :  */
    4639             : static inline void
    4640        1778 : set_indexsafe_procflags(void)
    4641             : {
    4642             :     /*
    4643             :      * This should only be called before installing xid or xmin in MyProc;
    4644             :      * otherwise, concurrent processes could see an Xmin that moves backwards.
    4645             :      */
    4646             :     Assert(MyProc->xid == InvalidTransactionId &&
    4647             :            MyProc->xmin == InvalidTransactionId);
    4648             : 
    4649        1778 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    4650        1778 :     MyProc->statusFlags |= PROC_IN_SAFE_IC;
    4651        1778 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
    4652        1778 :     LWLockRelease(ProcArrayLock);
    4653        1778 : }

Generated by: LCOV version 1.16