LCOV - code coverage report
Current view: top level - src/backend/commands - indexcmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.8 % 1346 1249
Test Date: 2026-04-07 14:16:30 Functions: 100.0 % 26 26
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * indexcmds.c
       4              :  *    POSTGRES define and remove index code.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/commands/indexcmds.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/amapi.h"
      19              : #include "access/attmap.h"
      20              : #include "access/gist.h"
      21              : #include "access/heapam.h"
      22              : #include "access/htup_details.h"
      23              : #include "access/reloptions.h"
      24              : #include "access/sysattr.h"
      25              : #include "access/tableam.h"
      26              : #include "access/xact.h"
      27              : #include "catalog/catalog.h"
      28              : #include "catalog/index.h"
      29              : #include "catalog/indexing.h"
      30              : #include "catalog/namespace.h"
      31              : #include "catalog/pg_am.h"
      32              : #include "catalog/pg_authid.h"
      33              : #include "catalog/pg_collation.h"
      34              : #include "catalog/pg_constraint.h"
      35              : #include "catalog/pg_database.h"
      36              : #include "catalog/pg_inherits.h"
      37              : #include "catalog/pg_namespace.h"
      38              : #include "catalog/pg_opclass.h"
      39              : #include "catalog/pg_tablespace.h"
      40              : #include "catalog/pg_type.h"
      41              : #include "commands/comment.h"
      42              : #include "commands/defrem.h"
      43              : #include "commands/event_trigger.h"
      44              : #include "commands/progress.h"
      45              : #include "commands/tablecmds.h"
      46              : #include "commands/tablespace.h"
      47              : #include "mb/pg_wchar.h"
      48              : #include "miscadmin.h"
      49              : #include "nodes/makefuncs.h"
      50              : #include "nodes/nodeFuncs.h"
      51              : #include "optimizer/optimizer.h"
      52              : #include "parser/parse_coerce.h"
      53              : #include "parser/parse_oper.h"
      54              : #include "parser/parse_utilcmd.h"
      55              : #include "partitioning/partdesc.h"
      56              : #include "pgstat.h"
      57              : #include "rewrite/rewriteManip.h"
      58              : #include "storage/lmgr.h"
      59              : #include "storage/proc.h"
      60              : #include "storage/procarray.h"
      61              : #include "utils/acl.h"
      62              : #include "utils/builtins.h"
      63              : #include "utils/fmgroids.h"
      64              : #include "utils/guc.h"
      65              : #include "utils/injection_point.h"
      66              : #include "utils/inval.h"
      67              : #include "utils/lsyscache.h"
      68              : #include "utils/memutils.h"
      69              : #include "utils/partcache.h"
      70              : #include "utils/pg_rusage.h"
      71              : #include "utils/regproc.h"
      72              : #include "utils/snapmgr.h"
      73              : #include "utils/syscache.h"
      74              : 
      75              : 
      76              : /* non-export function prototypes */
      77              : static bool CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts);
      78              : static void CheckPredicate(Expr *predicate);
      79              : static void ComputeIndexAttrs(ParseState *pstate,
      80              :                               IndexInfo *indexInfo,
      81              :                               Oid *typeOids,
      82              :                               Oid *collationOids,
      83              :                               Oid *opclassOids,
      84              :                               Datum *opclassOptions,
      85              :                               int16 *colOptions,
      86              :                               const List *attList,
      87              :                               const List *exclusionOpNames,
      88              :                               Oid relId,
      89              :                               const char *accessMethodName,
      90              :                               Oid accessMethodId,
      91              :                               bool amcanorder,
      92              :                               bool isconstraint,
      93              :                               bool iswithoutoverlaps,
      94              :                               Oid ddl_userid,
      95              :                               int ddl_sec_context,
      96              :                               int *ddl_save_nestlevel);
      97              : static char *ChooseIndexName(const char *tabname, Oid namespaceId,
      98              :                              const List *colnames, const List *exclusionOpNames,
      99              :                              bool primary, bool isconstraint);
     100              : static char *ChooseIndexNameAddition(const List *colnames);
     101              : static List *ChooseIndexColumnNames(const List *indexElems);
     102              : static void ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params,
     103              :                          bool isTopLevel);
     104              : static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
     105              :                                             Oid relId, Oid oldRelId, void *arg);
     106              : static Oid  ReindexTable(const ReindexStmt *stmt, const ReindexParams *params,
     107              :                          bool isTopLevel);
     108              : static void ReindexMultipleTables(const ReindexStmt *stmt,
     109              :                                   const ReindexParams *params);
     110              : static void reindex_error_callback(void *arg);
     111              : static void ReindexPartitions(const ReindexStmt *stmt, Oid relid,
     112              :                               const ReindexParams *params, bool isTopLevel);
     113              : static void ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids,
     114              :                                     const ReindexParams *params);
     115              : static bool ReindexRelationConcurrently(const ReindexStmt *stmt,
     116              :                                         Oid relationOid,
     117              :                                         const ReindexParams *params);
     118              : static void update_relispartition(Oid relationId, bool newval);
     119              : static inline void set_indexsafe_procflags(void);
     120              : 
     121              : /*
     122              :  * callback argument type for RangeVarCallbackForReindexIndex()
     123              :  */
     124              : struct ReindexIndexCallbackState
     125              : {
     126              :     ReindexParams params;       /* options from statement */
     127              :     Oid         locked_table_oid;   /* tracks previously locked table */
     128              : };
     129              : 
     130              : /*
     131              :  * callback arguments for reindex_error_callback()
     132              :  */
     133              : typedef struct ReindexErrorInfo
     134              : {
     135              :     char       *relname;
     136              :     char       *relnamespace;
     137              :     char        relkind;
     138              : } ReindexErrorInfo;
     139              : 
     140              : /*
     141              :  * CheckIndexCompatible
     142              :  *      Determine whether an existing index definition is compatible with a
     143              :  *      prospective index definition, such that the existing index storage
     144              :  *      could become the storage of the new index, avoiding a rebuild.
     145              :  *
     146              :  * 'oldId': the OID of the existing index
     147              :  * 'accessMethodName': name of the AM to use.
     148              :  * 'attributeList': a list of IndexElem specifying columns and expressions
     149              :  *      to index on.
     150              :  * 'exclusionOpNames': list of names of exclusion-constraint operators,
     151              :  *      or NIL if not an exclusion constraint.
     152              :  * 'isWithoutOverlaps': true iff this index has a WITHOUT OVERLAPS clause.
     153              :  *
     154              :  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
     155              :  * any indexes that depended on a changing column from their pg_get_indexdef
     156              :  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
     157              :  * DefineIndex.  We assume that the old and new indexes have the same number
     158              :  * of columns and that if one has an expression column or predicate, both do.
     159              :  * Errors arising from the attribute list still apply.
     160              :  *
     161              :  * Most column type changes that can skip a table rewrite do not invalidate
     162              :  * indexes.  We acknowledge this when all operator classes, collations and
     163              :  * exclusion operators match.  Though we could further permit intra-opfamily
     164              :  * changes for btree and hash indexes, that adds subtle complexity with no
     165              :  * concrete benefit for core types. Note, that INCLUDE columns aren't
     166              :  * checked by this function, for them it's enough that table rewrite is
     167              :  * skipped.
     168              :  *
     169              :  * When a comparison or exclusion operator has a polymorphic input type, the
     170              :  * actual input types must also match.  This defends against the possibility
     171              :  * that operators could vary behavior in response to get_fn_expr_argtype().
     172              :  * At present, this hazard is theoretical: check_exclusion_constraint() and
     173              :  * all core index access methods decline to set fn_expr for such calls.
     174              :  *
     175              :  * We do not yet implement a test to verify compatibility of expression
     176              :  * columns or predicates, so assume any such index is incompatible.
     177              :  */
     178              : bool
     179           73 : CheckIndexCompatible(Oid oldId,
     180              :                      const char *accessMethodName,
     181              :                      const List *attributeList,
     182              :                      const List *exclusionOpNames,
     183              :                      bool isWithoutOverlaps)
     184              : {
     185              :     bool        isconstraint;
     186              :     Oid        *typeIds;
     187              :     Oid        *collationIds;
     188              :     Oid        *opclassIds;
     189              :     Datum      *opclassOptions;
     190              :     Oid         accessMethodId;
     191              :     Oid         relationId;
     192              :     HeapTuple   tuple;
     193              :     Form_pg_index indexForm;
     194              :     Form_pg_am  accessMethodForm;
     195              :     const IndexAmRoutine *amRoutine;
     196              :     bool        amcanorder;
     197              :     bool        amsummarizing;
     198              :     int16      *coloptions;
     199              :     IndexInfo  *indexInfo;
     200              :     int         numberOfAttributes;
     201              :     int         old_natts;
     202           73 :     bool        ret = true;
     203              :     oidvector  *old_indclass;
     204              :     oidvector  *old_indcollation;
     205              :     Relation    irel;
     206              :     int         i;
     207              :     Datum       d;
     208              : 
     209              :     /* Caller should already have the relation locked in some way. */
     210           73 :     relationId = IndexGetRelation(oldId, false);
     211              : 
     212              :     /*
     213              :      * We can pretend isconstraint = false unconditionally.  It only serves to
     214              :      * decide the text of an error message that should never happen for us.
     215              :      */
     216           73 :     isconstraint = false;
     217              : 
     218           73 :     numberOfAttributes = list_length(attributeList);
     219              :     Assert(numberOfAttributes > 0);
     220              :     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
     221              : 
     222              :     /* look up the access method */
     223           73 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     224           73 :     if (!HeapTupleIsValid(tuple))
     225            0 :         ereport(ERROR,
     226              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     227              :                  errmsg("access method \"%s\" does not exist",
     228              :                         accessMethodName)));
     229           73 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     230           73 :     accessMethodId = accessMethodForm->oid;
     231           73 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     232           73 :     ReleaseSysCache(tuple);
     233              : 
     234           73 :     amcanorder = amRoutine->amcanorder;
     235           73 :     amsummarizing = amRoutine->amsummarizing;
     236              : 
     237              :     /*
     238              :      * Compute the operator classes, collations, and exclusion operators for
     239              :      * the new index, so we can test whether it's compatible with the existing
     240              :      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
     241              :      * DefineIndex would have failed later.  Our attributeList contains only
     242              :      * key attributes, thus we're filling ii_NumIndexAttrs and
     243              :      * ii_NumIndexKeyAttrs with same value.
     244              :      */
     245           73 :     indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
     246              :                               accessMethodId, NIL, NIL, false, false,
     247              :                               false, false, amsummarizing, isWithoutOverlaps);
     248           73 :     typeIds = palloc_array(Oid, numberOfAttributes);
     249           73 :     collationIds = palloc_array(Oid, numberOfAttributes);
     250           73 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     251           73 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     252           73 :     coloptions = palloc_array(int16, numberOfAttributes);
     253           73 :     ComputeIndexAttrs(NULL, indexInfo,
     254              :                       typeIds, collationIds, opclassIds, opclassOptions,
     255              :                       coloptions, attributeList,
     256              :                       exclusionOpNames, relationId,
     257              :                       accessMethodName, accessMethodId,
     258              :                       amcanorder, isconstraint, isWithoutOverlaps, InvalidOid,
     259              :                       0, NULL);
     260              : 
     261              :     /* Get the soon-obsolete pg_index tuple. */
     262           73 :     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
     263           73 :     if (!HeapTupleIsValid(tuple))
     264            0 :         elog(ERROR, "cache lookup failed for index %u", oldId);
     265           73 :     indexForm = (Form_pg_index) GETSTRUCT(tuple);
     266              : 
     267              :     /*
     268              :      * We don't assess expressions or predicates; assume incompatibility.
     269              :      * Also, if the index is invalid for any reason, treat it as incompatible.
     270              :      */
     271          146 :     if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
     272           73 :           heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
     273           69 :           indexForm->indisvalid))
     274              :     {
     275            4 :         ReleaseSysCache(tuple);
     276            4 :         return false;
     277              :     }
     278              : 
     279              :     /* Any change in operator class or collation breaks compatibility. */
     280           69 :     old_natts = indexForm->indnkeyatts;
     281              :     Assert(old_natts == numberOfAttributes);
     282              : 
     283           69 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indcollation);
     284           69 :     old_indcollation = (oidvector *) DatumGetPointer(d);
     285              : 
     286           69 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indclass);
     287           69 :     old_indclass = (oidvector *) DatumGetPointer(d);
     288              : 
     289          138 :     ret = (memcmp(old_indclass->values, opclassIds, old_natts * sizeof(Oid)) == 0 &&
     290           69 :            memcmp(old_indcollation->values, collationIds, old_natts * sizeof(Oid)) == 0);
     291              : 
     292           69 :     ReleaseSysCache(tuple);
     293              : 
     294           69 :     if (!ret)
     295            0 :         return false;
     296              : 
     297              :     /* For polymorphic opcintype, column type changes break compatibility. */
     298           69 :     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
     299          142 :     for (i = 0; i < old_natts; i++)
     300              :     {
     301           73 :         if (IsPolymorphicType(get_opclass_input_type(opclassIds[i])) &&
     302            0 :             TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     303              :         {
     304            0 :             ret = false;
     305            0 :             break;
     306              :         }
     307              :     }
     308              : 
     309              :     /* Any change in opclass options break compatibility. */
     310           69 :     if (ret)
     311              :     {
     312           69 :         Datum      *oldOpclassOptions = palloc_array(Datum, old_natts);
     313              : 
     314          142 :         for (i = 0; i < old_natts; i++)
     315           73 :             oldOpclassOptions[i] = get_attoptions(oldId, i + 1);
     316              : 
     317           69 :         ret = CompareOpclassOptions(oldOpclassOptions, opclassOptions, old_natts);
     318              : 
     319           69 :         pfree(oldOpclassOptions);
     320              :     }
     321              : 
     322              :     /* Any change in exclusion operator selections breaks compatibility. */
     323           69 :     if (ret && indexInfo->ii_ExclusionOps != NULL)
     324              :     {
     325              :         Oid        *old_operators,
     326              :                    *old_procs;
     327              :         uint16     *old_strats;
     328              : 
     329            0 :         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
     330            0 :         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
     331              :                      old_natts * sizeof(Oid)) == 0;
     332              : 
     333              :         /* Require an exact input type match for polymorphic operators. */
     334            0 :         if (ret)
     335              :         {
     336            0 :             for (i = 0; i < old_natts && ret; i++)
     337              :             {
     338              :                 Oid         left,
     339              :                             right;
     340              : 
     341            0 :                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
     342            0 :                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
     343            0 :                     TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     344              :                 {
     345            0 :                     ret = false;
     346            0 :                     break;
     347              :                 }
     348              :             }
     349              :         }
     350              :     }
     351              : 
     352           69 :     index_close(irel, NoLock);
     353           69 :     return ret;
     354              : }
     355              : 
     356              : /*
     357              :  * CompareOpclassOptions
     358              :  *
     359              :  * Compare per-column opclass options which are represented by arrays of text[]
     360              :  * datums.  Both elements of arrays and array themselves can be NULL.
     361              :  */
     362              : static bool
     363           69 : CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts)
     364              : {
     365              :     int         i;
     366              :     FmgrInfo    fm;
     367              : 
     368           69 :     if (!opts1 && !opts2)
     369            0 :         return true;
     370              : 
     371           69 :     fmgr_info(F_ARRAY_EQ, &fm);
     372          142 :     for (i = 0; i < natts; i++)
     373              :     {
     374           73 :         Datum       opt1 = opts1 ? opts1[i] : (Datum) 0;
     375           73 :         Datum       opt2 = opts2 ? opts2[i] : (Datum) 0;
     376              : 
     377           73 :         if (opt1 == (Datum) 0)
     378              :         {
     379           72 :             if (opt2 == (Datum) 0)
     380           72 :                 continue;
     381              :             else
     382            0 :                 return false;
     383              :         }
     384            1 :         else if (opt2 == (Datum) 0)
     385            0 :             return false;
     386              : 
     387              :         /*
     388              :          * Compare non-NULL text[] datums.  Use C collation to enforce binary
     389              :          * equivalence of texts, because we don't know anything about the
     390              :          * semantics of opclass options.
     391              :          */
     392            1 :         if (!DatumGetBool(FunctionCall2Coll(&fm, C_COLLATION_OID, opt1, opt2)))
     393            0 :             return false;
     394              :     }
     395              : 
     396           69 :     return true;
     397              : }
     398              : 
     399              : /*
     400              :  * WaitForOlderSnapshots
     401              :  *
     402              :  * Wait for transactions that might have an older snapshot than the given xmin
     403              :  * limit, because it might not contain tuples deleted just before it has
     404              :  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
     405              :  * individually. This is used when building an index concurrently.
     406              :  *
     407              :  * We can exclude any running transactions that have xmin > the xmin given;
     408              :  * their oldest snapshot must be newer than our xmin limit.
     409              :  * We can also exclude any transactions that have xmin = zero, since they
     410              :  * evidently have no live snapshot at all (and any one they might be in
     411              :  * process of taking is certainly newer than ours).  Transactions in other
     412              :  * DBs can be ignored too, since they'll never even be able to see the
     413              :  * index being worked on.
     414              :  *
     415              :  * We can also exclude autovacuum processes and processes running manual
     416              :  * lazy VACUUMs, because they won't be fazed by missing index entries
     417              :  * either.  (Manual ANALYZEs, however, can't be excluded because they
     418              :  * might be within transactions that are going to do arbitrary operations
     419              :  * later.)  Processes running CREATE INDEX CONCURRENTLY or REINDEX CONCURRENTLY
     420              :  * on indexes that are neither expressional nor partial are also safe to
     421              :  * ignore, since we know that those processes won't examine any data
     422              :  * outside the table they're indexing.
     423              :  *
     424              :  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
     425              :  * check for that.
     426              :  *
     427              :  * If a process goes idle-in-transaction with xmin zero, we do not need to
     428              :  * wait for it anymore, per the above argument.  We do not have the
     429              :  * infrastructure right now to stop waiting if that happens, but we can at
     430              :  * least avoid the folly of waiting when it is idle at the time we would
     431              :  * begin to wait.  We do this by repeatedly rechecking the output of
     432              :  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
     433              :  * doesn't show up in the output, we know we can forget about it.
     434              :  */
     435              : void
     436          415 : WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
     437              : {
     438              :     int         n_old_snapshots;
     439              :     int         i;
     440              :     VirtualTransactionId *old_snapshots;
     441              : 
     442          415 :     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
     443              :                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     444              :                                           | PROC_IN_SAFE_IC,
     445              :                                           &n_old_snapshots);
     446          415 :     if (progress)
     447          408 :         pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
     448              : 
     449          564 :     for (i = 0; i < n_old_snapshots; i++)
     450              :     {
     451          149 :         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
     452           32 :             continue;           /* found uninteresting in previous cycle */
     453              : 
     454          117 :         if (i > 0)
     455              :         {
     456              :             /* see if anything's changed ... */
     457              :             VirtualTransactionId *newer_snapshots;
     458              :             int         n_newer_snapshots;
     459              :             int         j;
     460              :             int         k;
     461              : 
     462           44 :             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
     463              :                                                     true, false,
     464              :                                                     PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     465              :                                                     | PROC_IN_SAFE_IC,
     466              :                                                     &n_newer_snapshots);
     467          152 :             for (j = i; j < n_old_snapshots; j++)
     468              :             {
     469          108 :                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
     470           13 :                     continue;   /* found uninteresting in previous cycle */
     471          260 :                 for (k = 0; k < n_newer_snapshots; k++)
     472              :                 {
     473          208 :                     if (VirtualTransactionIdEquals(old_snapshots[j],
     474              :                                                    newer_snapshots[k]))
     475           43 :                         break;
     476              :                 }
     477           95 :                 if (k >= n_newer_snapshots) /* not there anymore */
     478           52 :                     SetInvalidVirtualTransactionId(old_snapshots[j]);
     479              :             }
     480           44 :             pfree(newer_snapshots);
     481              :         }
     482              : 
     483          117 :         if (VirtualTransactionIdIsValid(old_snapshots[i]))
     484              :         {
     485              :             /* If requested, publish who we're going to wait for. */
     486           97 :             if (progress)
     487              :             {
     488           97 :                 PGPROC     *holder = ProcNumberGetProc(old_snapshots[i].procNumber);
     489              : 
     490           97 :                 if (holder)
     491           97 :                     pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
     492           97 :                                                  holder->pid);
     493              :             }
     494           97 :             VirtualXactLock(old_snapshots[i], true);
     495              :         }
     496              : 
     497          117 :         if (progress)
     498          117 :             pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
     499              :     }
     500          415 : }
     501              : 
     502              : 
     503              : /*
     504              :  * DefineIndex
     505              :  *      Creates a new index.
     506              :  *
     507              :  * This function manages the current userid according to the needs of pg_dump.
     508              :  * Recreating old-database catalog entries in new-database is fine, regardless
     509              :  * of which users would have permission to recreate those entries now.  That's
     510              :  * just preservation of state.  Running opaque expressions, like calling a
     511              :  * function named in a catalog entry or evaluating a pg_node_tree in a catalog
     512              :  * entry, as anyone other than the object owner, is not fine.  To adhere to
     513              :  * those principles and to remain fail-safe, use the table owner userid for
     514              :  * most ACL checks.  Use the original userid for ACL checks reached without
     515              :  * traversing opaque expressions.  (pg_dump can predict such ACL checks from
     516              :  * catalogs.)  Overall, this is a mess.  Future DDL development should
     517              :  * consider offering one DDL command for catalog setup and a separate DDL
     518              :  * command for steps that run opaque expressions.
     519              :  *
     520              :  * 'pstate': ParseState struct (used only for error reports; pass NULL if
     521              :  *      not available)
     522              :  * 'tableId': the OID of the table relation on which the index is to be
     523              :  *      created
     524              :  * 'stmt': IndexStmt describing the properties of the new index.
     525              :  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
     526              :  *      nonzero to specify a preselected OID for the index.
     527              :  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
     528              :  *      of a partitioned index.
     529              :  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
     530              :  *      the child of a constraint (only used when recursing)
     531              :  * 'total_parts': total number of direct and indirect partitions of relation;
     532              :  *      pass -1 if not known or rel is not partitioned.
     533              :  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
     534              :  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
     535              :  *      should be true except when ALTER is deleting/recreating an index.)
     536              :  * 'check_not_in_use': check for table not already in use in current session.
     537              :  *      This should be true unless caller is holding the table open, in which
     538              :  *      case the caller had better have checked it earlier.
     539              :  * 'skip_build': make the catalog entries but don't create the index files
     540              :  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
     541              :  *
     542              :  * Returns the object address of the created index.
     543              :  */
     544              : ObjectAddress
     545        19892 : DefineIndex(ParseState *pstate,
     546              :             Oid tableId,
     547              :             const IndexStmt *stmt,
     548              :             Oid indexRelationId,
     549              :             Oid parentIndexId,
     550              :             Oid parentConstraintId,
     551              :             int total_parts,
     552              :             bool is_alter_table,
     553              :             bool check_rights,
     554              :             bool check_not_in_use,
     555              :             bool skip_build,
     556              :             bool quiet)
     557              : {
     558              :     bool        concurrent;
     559              :     char       *indexRelationName;
     560              :     char       *accessMethodName;
     561              :     Oid        *typeIds;
     562              :     Oid        *collationIds;
     563              :     Oid        *opclassIds;
     564              :     Datum      *opclassOptions;
     565              :     Oid         accessMethodId;
     566              :     Oid         namespaceId;
     567              :     Oid         tablespaceId;
     568        19892 :     Oid         createdConstraintId = InvalidOid;
     569              :     List       *indexColNames;
     570              :     List       *allIndexParams;
     571              :     Relation    rel;
     572              :     HeapTuple   tuple;
     573              :     Form_pg_am  accessMethodForm;
     574              :     const IndexAmRoutine *amRoutine;
     575              :     bool        amcanorder;
     576              :     bool        amissummarizing;
     577              :     amoptions_function amoptions;
     578              :     bool        exclusion;
     579              :     bool        partitioned;
     580              :     bool        safe_index;
     581              :     Datum       reloptions;
     582              :     int16      *coloptions;
     583              :     IndexInfo  *indexInfo;
     584              :     uint16      flags;
     585              :     uint16      constr_flags;
     586              :     int         numberOfAttributes;
     587              :     int         numberOfKeyAttributes;
     588              :     TransactionId limitXmin;
     589              :     ObjectAddress address;
     590              :     LockRelId   heaprelid;
     591              :     LOCKTAG     heaplocktag;
     592              :     LOCKMODE    lockmode;
     593              :     Snapshot    snapshot;
     594              :     Oid         root_save_userid;
     595              :     int         root_save_sec_context;
     596              :     int         root_save_nestlevel;
     597              : 
     598        19892 :     root_save_nestlevel = NewGUCNestLevel();
     599              : 
     600        19892 :     RestrictSearchPath();
     601              : 
     602              :     /*
     603              :      * Some callers need us to run with an empty default_tablespace; this is a
     604              :      * necessary hack to be able to reproduce catalog state accurately when
     605              :      * recreating indexes after table-rewriting ALTER TABLE.
     606              :      */
     607        19892 :     if (stmt->reset_default_tblspc)
     608          306 :         (void) set_config_option("default_tablespace", "",
     609              :                                  PGC_USERSET, PGC_S_SESSION,
     610              :                                  GUC_ACTION_SAVE, true, 0, false);
     611              : 
     612              :     /*
     613              :      * Force non-concurrent build on temporary relations, even if CONCURRENTLY
     614              :      * was requested.  Other backends can't access a temporary relation, so
     615              :      * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
     616              :      * is more efficient.  Do this before any use of the concurrent option is
     617              :      * done.
     618              :      */
     619        19892 :     if (stmt->concurrent && get_rel_persistence(tableId) != RELPERSISTENCE_TEMP)
     620          119 :         concurrent = true;
     621              :     else
     622        19773 :         concurrent = false;
     623              : 
     624              :     /*
     625              :      * Start progress report.  If we're building a partition, this was already
     626              :      * done.
     627              :      */
     628        19892 :     if (!OidIsValid(parentIndexId))
     629              :     {
     630        17828 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, tableId);
     631        17828 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
     632              :                                      concurrent ?
     633              :                                      PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
     634              :                                      PROGRESS_CREATEIDX_COMMAND_CREATE);
     635              :     }
     636              : 
     637              :     /*
     638              :      * No index OID to report yet
     639              :      */
     640        19892 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
     641              :                                  InvalidOid);
     642              : 
     643              :     /*
     644              :      * count key attributes in index
     645              :      */
     646        19892 :     numberOfKeyAttributes = list_length(stmt->indexParams);
     647              : 
     648              :     /*
     649              :      * Calculate the new list of index columns including both key columns and
     650              :      * INCLUDE columns.  Later we can determine which of these are key
     651              :      * columns, and which are just part of the INCLUDE list by checking the
     652              :      * list position.  A list item in a position less than ii_NumIndexKeyAttrs
     653              :      * is part of the key columns, and anything equal to and over is part of
     654              :      * the INCLUDE columns.
     655              :      */
     656        19892 :     allIndexParams = list_concat_copy(stmt->indexParams,
     657        19892 :                                       stmt->indexIncludingParams);
     658        19892 :     numberOfAttributes = list_length(allIndexParams);
     659              : 
     660        19892 :     if (numberOfKeyAttributes <= 0)
     661            0 :         ereport(ERROR,
     662              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     663              :                  errmsg("must specify at least one column")));
     664        19892 :     if (numberOfAttributes > INDEX_MAX_KEYS)
     665            0 :         ereport(ERROR,
     666              :                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
     667              :                  errmsg("cannot use more than %d columns in an index",
     668              :                         INDEX_MAX_KEYS)));
     669              : 
     670              :     /*
     671              :      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     672              :      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     673              :      * (but not VACUUM).
     674              :      *
     675              :      * NB: Caller is responsible for making sure that tableId refers to the
     676              :      * relation on which the index should be built; except in bootstrap mode,
     677              :      * this will typically require the caller to have already locked the
     678              :      * relation.  To avoid lock upgrade hazards, that lock should be at least
     679              :      * as strong as the one we take here.
     680              :      *
     681              :      * NB: If the lock strength here ever changes, code that is run by
     682              :      * parallel workers under the control of certain particular ambuild
     683              :      * functions will need to be updated, too.
     684              :      */
     685        19892 :     lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
     686        19892 :     rel = table_open(tableId, lockmode);
     687              : 
     688              :     /*
     689              :      * Switch to the table owner's userid, so that any index functions are run
     690              :      * as that user.  Also lock down security-restricted operations.  We
     691              :      * already arranged to make GUC variable changes local to this command.
     692              :      */
     693        19892 :     GetUserIdAndSecContext(&root_save_userid, &root_save_sec_context);
     694        19892 :     SetUserIdAndSecContext(rel->rd_rel->relowner,
     695              :                            root_save_sec_context | SECURITY_RESTRICTED_OPERATION);
     696              : 
     697        19892 :     namespaceId = RelationGetNamespace(rel);
     698              : 
     699              :     /*
     700              :      * It has exclusion constraint behavior if it's an EXCLUDE constraint or a
     701              :      * temporal PRIMARY KEY/UNIQUE constraint
     702              :      */
     703        19892 :     exclusion = stmt->excludeOpNames || stmt->iswithoutoverlaps;
     704              : 
     705              :     /* Ensure that it makes sense to index this kind of relation */
     706        19892 :     switch (rel->rd_rel->relkind)
     707              :     {
     708        19888 :         case RELKIND_RELATION:
     709              :         case RELKIND_MATVIEW:
     710              :         case RELKIND_PARTITIONED_TABLE:
     711              :             /* OK */
     712        19888 :             break;
     713            4 :         default:
     714            4 :             ereport(ERROR,
     715              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     716              :                      errmsg("cannot create index on relation \"%s\"",
     717              :                             RelationGetRelationName(rel)),
     718              :                      errdetail_relkind_not_supported(rel->rd_rel->relkind)));
     719              :             break;
     720              :     }
     721              : 
     722              :     /*
     723              :      * Establish behavior for partitioned tables, and verify sanity of
     724              :      * parameters.
     725              :      *
     726              :      * We do not build an actual index in this case; we only create a few
     727              :      * catalog entries.  The actual indexes are built by recursing for each
     728              :      * partition.
     729              :      */
     730        19888 :     partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
     731        19888 :     if (partitioned)
     732              :     {
     733              :         /*
     734              :          * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
     735              :          * the error is thrown also for temporary tables.  Seems better to be
     736              :          * consistent, even though we could do it on temporary table because
     737              :          * we're not actually doing it concurrently.
     738              :          */
     739         1463 :         if (stmt->concurrent)
     740            4 :             ereport(ERROR,
     741              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     742              :                      errmsg("cannot create index on partitioned table \"%s\" concurrently",
     743              :                             RelationGetRelationName(rel))));
     744              :     }
     745              : 
     746              :     /*
     747              :      * Don't try to CREATE INDEX on temp tables of other backends.
     748              :      */
     749        19884 :     if (RELATION_IS_OTHER_TEMP(rel))
     750            0 :         ereport(ERROR,
     751              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     752              :                  errmsg("cannot create indexes on temporary tables of other sessions")));
     753              : 
     754              :     /*
     755              :      * Unless our caller vouches for having checked this already, insist that
     756              :      * the table not be in use by our own session, either.  Otherwise we might
     757              :      * fail to make entries in the new index (for instance, if an INSERT or
     758              :      * UPDATE is in progress and has already made its list of target indexes).
     759              :      */
     760        19884 :     if (check_not_in_use)
     761         9599 :         CheckTableNotInUse(rel, "CREATE INDEX");
     762              : 
     763              :     /*
     764              :      * Verify we (still) have CREATE rights in the rel's namespace.
     765              :      * (Presumably we did when the rel was created, but maybe not anymore.)
     766              :      * Skip check if caller doesn't want it.  Also skip check if
     767              :      * bootstrapping, since permissions machinery may not be working yet.
     768              :      */
     769        19880 :     if (check_rights && !IsBootstrapProcessingMode())
     770              :     {
     771              :         AclResult   aclresult;
     772              : 
     773        10385 :         aclresult = object_aclcheck(NamespaceRelationId, namespaceId, root_save_userid,
     774              :                                     ACL_CREATE);
     775        10385 :         if (aclresult != ACLCHECK_OK)
     776            0 :             aclcheck_error(aclresult, OBJECT_SCHEMA,
     777            0 :                            get_namespace_name(namespaceId));
     778              :     }
     779              : 
     780              :     /*
     781              :      * Select tablespace to use.  If not specified, use default tablespace
     782              :      * (which may in turn default to database's default).
     783              :      */
     784        19880 :     if (stmt->tableSpace)
     785              :     {
     786          161 :         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
     787          161 :         if (partitioned && tablespaceId == MyDatabaseTableSpace)
     788            4 :             ereport(ERROR,
     789              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     790              :                      errmsg("cannot specify default tablespace for partitioned relations")));
     791              :     }
     792              :     else
     793              :     {
     794        19719 :         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
     795              :                                             partitioned);
     796              :         /* note InvalidOid is OK in this case */
     797              :     }
     798              : 
     799              :     /* Check tablespace permissions */
     800        19872 :     if (check_rights &&
     801           74 :         OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
     802              :     {
     803              :         AclResult   aclresult;
     804              : 
     805           74 :         aclresult = object_aclcheck(TableSpaceRelationId, tablespaceId, root_save_userid,
     806              :                                     ACL_CREATE);
     807           74 :         if (aclresult != ACLCHECK_OK)
     808            0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     809            0 :                            get_tablespace_name(tablespaceId));
     810              :     }
     811              : 
     812              :     /*
     813              :      * Force shared indexes into the pg_global tablespace.  This is a bit of a
     814              :      * hack but seems simpler than marking them in the BKI commands.  On the
     815              :      * other hand, if it's not shared, don't allow it to be placed there.
     816              :      */
     817        19872 :     if (rel->rd_rel->relisshared)
     818         1197 :         tablespaceId = GLOBALTABLESPACE_OID;
     819        18675 :     else if (tablespaceId == GLOBALTABLESPACE_OID)
     820            0 :         ereport(ERROR,
     821              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     822              :                  errmsg("only shared relations can be placed in pg_global tablespace")));
     823              : 
     824              :     /*
     825              :      * Choose the index column names.
     826              :      */
     827        19872 :     indexColNames = ChooseIndexColumnNames(allIndexParams);
     828              : 
     829              :     /*
     830              :      * Select name for index if caller didn't specify
     831              :      */
     832        19872 :     indexRelationName = stmt->idxname;
     833        19872 :     if (indexRelationName == NULL)
     834         7990 :         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
     835              :                                             namespaceId,
     836              :                                             indexColNames,
     837         7990 :                                             stmt->excludeOpNames,
     838         7990 :                                             stmt->primary,
     839         7990 :                                             stmt->isconstraint);
     840              : 
     841              :     /*
     842              :      * look up the access method, verify it can handle the requested features
     843              :      */
     844        19872 :     accessMethodName = stmt->accessMethod;
     845        19872 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     846        19872 :     if (!HeapTupleIsValid(tuple))
     847              :     {
     848              :         /*
     849              :          * Hack to provide more-or-less-transparent updating of old RTREE
     850              :          * indexes to GiST: if RTREE is requested and not found, use GIST.
     851              :          */
     852            4 :         if (strcmp(accessMethodName, "rtree") == 0)
     853              :         {
     854            4 :             ereport(NOTICE,
     855              :                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
     856            4 :             accessMethodName = "gist";
     857            4 :             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     858              :         }
     859              : 
     860            4 :         if (!HeapTupleIsValid(tuple))
     861            0 :             ereport(ERROR,
     862              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     863              :                      errmsg("access method \"%s\" does not exist",
     864              :                             accessMethodName)));
     865              :     }
     866        19872 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     867        19872 :     accessMethodId = accessMethodForm->oid;
     868        19872 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     869              : 
     870        19872 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
     871              :                                  accessMethodId);
     872              : 
     873        19872 :     if (stmt->unique && !stmt->iswithoutoverlaps && !amRoutine->amcanunique)
     874            0 :         ereport(ERROR,
     875              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     876              :                  errmsg("access method \"%s\" does not support unique indexes",
     877              :                         accessMethodName)));
     878        19872 :     if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
     879           12 :         ereport(ERROR,
     880              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     881              :                  errmsg("access method \"%s\" does not support included columns",
     882              :                         accessMethodName)));
     883        19860 :     if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
     884            0 :         ereport(ERROR,
     885              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     886              :                  errmsg("access method \"%s\" does not support multicolumn indexes",
     887              :                         accessMethodName)));
     888        19860 :     if (exclusion && amRoutine->amgettuple == NULL)
     889            0 :         ereport(ERROR,
     890              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     891              :                  errmsg("access method \"%s\" does not support exclusion constraints",
     892              :                         accessMethodName)));
     893        19860 :     if (stmt->iswithoutoverlaps && strcmp(accessMethodName, "gist") != 0)
     894            0 :         ereport(ERROR,
     895              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     896              :                  errmsg("access method \"%s\" does not support WITHOUT OVERLAPS constraints",
     897              :                         accessMethodName)));
     898              : 
     899        19860 :     amcanorder = amRoutine->amcanorder;
     900        19860 :     amoptions = amRoutine->amoptions;
     901        19860 :     amissummarizing = amRoutine->amsummarizing;
     902              : 
     903        19860 :     ReleaseSysCache(tuple);
     904              : 
     905              :     /*
     906              :      * Validate predicate, if given
     907              :      */
     908        19860 :     if (stmt->whereClause)
     909          281 :         CheckPredicate((Expr *) stmt->whereClause);
     910              : 
     911              :     /*
     912              :      * Parse AM-specific options, convert to text array form, validate.
     913              :      */
     914        19860 :     reloptions = transformRelOptions((Datum) 0, stmt->options,
     915              :                                      NULL, NULL, false, false);
     916              : 
     917        19856 :     (void) index_reloptions(amoptions, reloptions, true);
     918              : 
     919              :     /*
     920              :      * Prepare arguments for index_create, primarily an IndexInfo structure.
     921              :      * Note that predicates must be in implicit-AND format.  In a concurrent
     922              :      * build, mark it not-ready-for-inserts.
     923              :      */
     924        19814 :     indexInfo = makeIndexInfo(numberOfAttributes,
     925              :                               numberOfKeyAttributes,
     926              :                               accessMethodId,
     927              :                               NIL,  /* expressions, NIL for now */
     928        19814 :                               make_ands_implicit((Expr *) stmt->whereClause),
     929        19814 :                               stmt->unique,
     930        19814 :                               stmt->nulls_not_distinct,
     931              :                               !concurrent,
     932              :                               concurrent,
     933              :                               amissummarizing,
     934        19814 :                               stmt->iswithoutoverlaps);
     935              : 
     936        19814 :     typeIds = palloc_array(Oid, numberOfAttributes);
     937        19814 :     collationIds = palloc_array(Oid, numberOfAttributes);
     938        19814 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     939        19814 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     940        19814 :     coloptions = palloc_array(int16, numberOfAttributes);
     941        19814 :     ComputeIndexAttrs(pstate,
     942              :                       indexInfo,
     943              :                       typeIds, collationIds, opclassIds, opclassOptions,
     944              :                       coloptions, allIndexParams,
     945        19814 :                       stmt->excludeOpNames, tableId,
     946              :                       accessMethodName, accessMethodId,
     947        19814 :                       amcanorder, stmt->isconstraint, stmt->iswithoutoverlaps,
     948              :                       root_save_userid, root_save_sec_context,
     949              :                       &root_save_nestlevel);
     950              : 
     951              :     /*
     952              :      * Extra checks when creating a PRIMARY KEY index.
     953              :      */
     954        19597 :     if (stmt->primary)
     955         5906 :         index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
     956              : 
     957              :     /*
     958              :      * If this table is partitioned and we're creating a unique index, primary
     959              :      * key, or exclusion constraint, make sure that the partition key is a
     960              :      * subset of the index's columns.  Otherwise it would be possible to
     961              :      * violate uniqueness by putting values that ought to be unique in
     962              :      * different partitions.
     963              :      *
     964              :      * We could lift this limitation if we had global indexes, but those have
     965              :      * their own problems, so this is a useful feature combination.
     966              :      */
     967        19573 :     if (partitioned && (stmt->unique || exclusion))
     968              :     {
     969          855 :         PartitionKey key = RelationGetPartitionKey(rel);
     970              :         const char *constraint_type;
     971              :         int         i;
     972              : 
     973          855 :         if (stmt->primary)
     974          624 :             constraint_type = "PRIMARY KEY";
     975          231 :         else if (stmt->unique)
     976          165 :             constraint_type = "UNIQUE";
     977           66 :         else if (stmt->excludeOpNames)
     978           66 :             constraint_type = "EXCLUDE";
     979              :         else
     980              :         {
     981            0 :             elog(ERROR, "unknown constraint type");
     982              :             constraint_type = NULL; /* keep compiler quiet */
     983              :         }
     984              : 
     985              :         /*
     986              :          * Verify that all the columns in the partition key appear in the
     987              :          * unique key definition, with the same notion of equality.
     988              :          */
     989         1721 :         for (i = 0; i < key->partnatts; i++)
     990              :         {
     991          935 :             bool        found = false;
     992              :             int         eq_strategy;
     993              :             Oid         ptkey_eqop;
     994              :             int         j;
     995              : 
     996              :             /*
     997              :              * Identify the equality operator associated with this partkey
     998              :              * column.  For list and range partitioning, partkeys use btree
     999              :              * operator classes; hash partitioning uses hash operator classes.
    1000              :              * (Keep this in sync with ComputePartitionAttrs!)
    1001              :              */
    1002          935 :             if (key->strategy == PARTITION_STRATEGY_HASH)
    1003           42 :                 eq_strategy = HTEqualStrategyNumber;
    1004              :             else
    1005          893 :                 eq_strategy = BTEqualStrategyNumber;
    1006              : 
    1007          935 :             ptkey_eqop = get_opfamily_member(key->partopfamily[i],
    1008          935 :                                              key->partopcintype[i],
    1009          935 :                                              key->partopcintype[i],
    1010              :                                              eq_strategy);
    1011          935 :             if (!OidIsValid(ptkey_eqop))
    1012            0 :                 elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
    1013              :                      eq_strategy, key->partopcintype[i], key->partopcintype[i],
    1014              :                      key->partopfamily[i]);
    1015              : 
    1016              :             /*
    1017              :              * It may be possible to support UNIQUE constraints when partition
    1018              :              * keys are expressions, but is it worth it?  Give up for now.
    1019              :              */
    1020          935 :             if (key->partattrs[i] == 0)
    1021            8 :                 ereport(ERROR,
    1022              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1023              :                          errmsg("unsupported %s constraint with partition key definition",
    1024              :                                 constraint_type),
    1025              :                          errdetail("%s constraints cannot be used when partition keys include expressions.",
    1026              :                                    constraint_type)));
    1027              : 
    1028              :             /* Search the index column(s) for a match */
    1029         1060 :             for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
    1030              :             {
    1031         1008 :                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
    1032              :                 {
    1033              :                     /*
    1034              :                      * Matched the column, now what about the collation and
    1035              :                      * equality op?
    1036              :                      */
    1037              :                     Oid         idx_opfamily;
    1038              :                     Oid         idx_opcintype;
    1039              : 
    1040          875 :                     if (key->partcollation[i] != collationIds[j])
    1041            0 :                         continue;
    1042              : 
    1043          875 :                     if (get_opclass_opfamily_and_input_type(opclassIds[j],
    1044              :                                                             &idx_opfamily,
    1045              :                                                             &idx_opcintype))
    1046              :                     {
    1047          875 :                         Oid         idx_eqop = InvalidOid;
    1048              : 
    1049          875 :                         if (stmt->unique && !stmt->iswithoutoverlaps)
    1050          773 :                             idx_eqop = get_opfamily_member_for_cmptype(idx_opfamily,
    1051              :                                                                        idx_opcintype,
    1052              :                                                                        idx_opcintype,
    1053              :                                                                        COMPARE_EQ);
    1054          102 :                         else if (exclusion)
    1055          102 :                             idx_eqop = indexInfo->ii_ExclusionOps[j];
    1056              : 
    1057          875 :                         if (!idx_eqop)
    1058            0 :                             ereport(ERROR,
    1059              :                                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1060              :                                     errmsg("could not identify an equality operator for type %s", format_type_be(idx_opcintype)),
    1061              :                                     errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    1062              :                                               get_opfamily_name(idx_opfamily, false), get_am_name(get_opfamily_method(idx_opfamily))));
    1063              : 
    1064          875 :                         if (ptkey_eqop == idx_eqop)
    1065              :                         {
    1066          866 :                             found = true;
    1067          866 :                             break;
    1068              :                         }
    1069            9 :                         else if (exclusion)
    1070              :                         {
    1071              :                             /*
    1072              :                              * We found a match, but it's not an equality
    1073              :                              * operator. Instead of failing below with an
    1074              :                              * error message about a missing column, fail now
    1075              :                              * and explain that the operator is wrong.
    1076              :                              */
    1077            9 :                             Form_pg_attribute att = TupleDescAttr(RelationGetDescr(rel), key->partattrs[i] - 1);
    1078              : 
    1079            9 :                             ereport(ERROR,
    1080              :                                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1081              :                                      errmsg("cannot match partition key to index on column \"%s\" using non-equal operator \"%s\"",
    1082              :                                             NameStr(att->attname),
    1083              :                                             get_opname(indexInfo->ii_ExclusionOps[j]))));
    1084              :                         }
    1085              :                     }
    1086              :                 }
    1087              :             }
    1088              : 
    1089          918 :             if (!found)
    1090              :             {
    1091              :                 Form_pg_attribute att;
    1092              : 
    1093           52 :                 att = TupleDescAttr(RelationGetDescr(rel),
    1094           52 :                                     key->partattrs[i] - 1);
    1095           52 :                 ereport(ERROR,
    1096              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1097              :                 /* translator: %s is UNIQUE, PRIMARY KEY, etc */
    1098              :                          errmsg("%s constraint on partitioned table must include all partitioning columns",
    1099              :                                 constraint_type),
    1100              :                 /* translator: first %s is UNIQUE, PRIMARY KEY, etc */
    1101              :                          errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
    1102              :                                    constraint_type, RelationGetRelationName(rel),
    1103              :                                    NameStr(att->attname))));
    1104              :             }
    1105              :         }
    1106              :     }
    1107              : 
    1108              : 
    1109              :     /*
    1110              :      * We disallow indexes on system columns.  They would not necessarily get
    1111              :      * updated correctly, and they don't seem useful anyway.
    1112              :      *
    1113              :      * Also disallow virtual generated columns in indexes (use expression
    1114              :      * index instead).
    1115              :      */
    1116        46921 :     for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
    1117              :     {
    1118        27433 :         AttrNumber  attno = indexInfo->ii_IndexAttrNumbers[i];
    1119              : 
    1120        27433 :         if (attno < 0)
    1121            4 :             ereport(ERROR,
    1122              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1123              :                      errmsg("index creation on system columns is not supported")));
    1124              : 
    1125              : 
    1126        27429 :         if (attno > 0 &&
    1127        26742 :             TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1128           12 :             ereport(ERROR,
    1129              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1130              :                     stmt->primary ?
    1131              :                     errmsg("primary keys on virtual generated columns are not supported") :
    1132              :                     stmt->isconstraint ?
    1133              :                     errmsg("unique constraints on virtual generated columns are not supported") :
    1134              :                     errmsg("indexes on virtual generated columns are not supported"));
    1135              :     }
    1136              : 
    1137              :     /*
    1138              :      * Also check for system and generated columns used in expressions or
    1139              :      * predicates.
    1140              :      */
    1141        19488 :     if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
    1142              :     {
    1143          886 :         Bitmapset  *indexattrs = NULL;
    1144              :         int         j;
    1145              : 
    1146          886 :         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
    1147          886 :         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
    1148              : 
    1149         6194 :         for (int i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
    1150              :         {
    1151         5316 :             if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
    1152              :                               indexattrs))
    1153            8 :                 ereport(ERROR,
    1154              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1155              :                          errmsg("index creation on system columns is not supported")));
    1156              :         }
    1157              : 
    1158              :         /*
    1159              :          * XXX Virtual generated columns in index expressions or predicates
    1160              :          * could be supported, but it needs support in
    1161              :          * RelationGetIndexExpressions() and RelationGetIndexPredicate().
    1162              :          */
    1163          878 :         j = -1;
    1164         1896 :         while ((j = bms_next_member(indexattrs, j)) >= 0)
    1165              :         {
    1166         1018 :             AttrNumber  attno = j + FirstLowInvalidHeapAttributeNumber;
    1167              : 
    1168         1018 :             if (attno > 0 &&
    1169         1014 :                 TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1170            0 :                 ereport(ERROR,
    1171              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1172              :                          stmt->isconstraint ?
    1173              :                          errmsg("unique constraints on virtual generated columns are not supported") :
    1174              :                          errmsg("indexes on virtual generated columns are not supported")));
    1175              :         }
    1176              :     }
    1177              : 
    1178              :     /* Is index safe for others to ignore?  See set_indexsafe_procflags() */
    1179        38293 :     safe_index = indexInfo->ii_Expressions == NIL &&
    1180        18813 :         indexInfo->ii_Predicate == NIL;
    1181              : 
    1182              :     /*
    1183              :      * Report index creation if appropriate (delay this till after most of the
    1184              :      * error checks)
    1185              :      */
    1186        19480 :     if (stmt->isconstraint && !quiet)
    1187              :     {
    1188              :         const char *constraint_type;
    1189              : 
    1190         6521 :         if (stmt->primary)
    1191         5770 :             constraint_type = "PRIMARY KEY";
    1192          751 :         else if (stmt->unique)
    1193          637 :             constraint_type = "UNIQUE";
    1194          114 :         else if (stmt->excludeOpNames)
    1195          114 :             constraint_type = "EXCLUDE";
    1196              :         else
    1197              :         {
    1198            0 :             elog(ERROR, "unknown constraint type");
    1199              :             constraint_type = NULL; /* keep compiler quiet */
    1200              :         }
    1201              : 
    1202         6521 :         ereport(DEBUG1,
    1203              :                 (errmsg_internal("%s %s will create implicit index \"%s\" for table \"%s\"",
    1204              :                                  is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
    1205              :                                  constraint_type,
    1206              :                                  indexRelationName, RelationGetRelationName(rel))));
    1207              :     }
    1208              : 
    1209              :     /*
    1210              :      * A valid stmt->oldNumber implies that we already have a built form of
    1211              :      * the index.  The caller should also decline any index build.
    1212              :      */
    1213              :     Assert(!RelFileNumberIsValid(stmt->oldNumber) || (skip_build && !concurrent));
    1214              : 
    1215              :     /*
    1216              :      * Make the catalog entries for the index, including constraints. This
    1217              :      * step also actually builds the index, except if caller requested not to
    1218              :      * or in concurrent mode, in which case it'll be done later, or doing a
    1219              :      * partitioned index (because those don't have storage).
    1220              :      */
    1221        19480 :     flags = constr_flags = 0;
    1222        19480 :     if (stmt->isconstraint)
    1223         6681 :         flags |= INDEX_CREATE_ADD_CONSTRAINT;
    1224        19480 :     if (skip_build || concurrent || partitioned)
    1225         9481 :         flags |= INDEX_CREATE_SKIP_BUILD;
    1226        19480 :     if (stmt->if_not_exists)
    1227           12 :         flags |= INDEX_CREATE_IF_NOT_EXISTS;
    1228        19480 :     if (concurrent)
    1229          115 :         flags |= INDEX_CREATE_CONCURRENT;
    1230        19480 :     if (partitioned)
    1231         1382 :         flags |= INDEX_CREATE_PARTITIONED;
    1232        19480 :     if (stmt->primary)
    1233         5854 :         flags |= INDEX_CREATE_IS_PRIMARY;
    1234              : 
    1235              :     /*
    1236              :      * If the table is partitioned, and recursion was declined but partitions
    1237              :      * exist, mark the index as invalid.
    1238              :      */
    1239        19480 :     if (partitioned && stmt->relation && !stmt->relation->inh)
    1240              :     {
    1241          159 :         PartitionDesc pd = RelationGetPartitionDesc(rel, true);
    1242              : 
    1243          159 :         if (pd->nparts != 0)
    1244          146 :             flags |= INDEX_CREATE_INVALID;
    1245              :     }
    1246              : 
    1247        19480 :     if (stmt->deferrable)
    1248           86 :         constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
    1249        19480 :     if (stmt->initdeferred)
    1250           24 :         constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
    1251        19480 :     if (stmt->iswithoutoverlaps)
    1252          579 :         constr_flags |= INDEX_CONSTR_CREATE_WITHOUT_OVERLAPS;
    1253              : 
    1254              :     indexRelationId =
    1255        19480 :         index_create(rel, indexRelationName, indexRelationId, parentIndexId,
    1256              :                      parentConstraintId,
    1257        19480 :                      stmt->oldNumber, indexInfo, indexColNames,
    1258              :                      accessMethodId, tablespaceId,
    1259              :                      collationIds, opclassIds, opclassOptions,
    1260              :                      coloptions, NULL, reloptions,
    1261              :                      flags, constr_flags,
    1262              :                      allowSystemTableMods, !check_rights,
    1263        19480 :                      &createdConstraintId);
    1264              : 
    1265        19330 :     ObjectAddressSet(address, RelationRelationId, indexRelationId);
    1266              : 
    1267        19330 :     if (!OidIsValid(indexRelationId))
    1268              :     {
    1269              :         /*
    1270              :          * Roll back any GUC changes executed by index functions.  Also revert
    1271              :          * to original default_tablespace if we changed it above.
    1272              :          */
    1273           12 :         AtEOXact_GUC(false, root_save_nestlevel);
    1274              : 
    1275              :         /* Restore userid and security context */
    1276           12 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1277              : 
    1278           12 :         table_close(rel, NoLock);
    1279              : 
    1280              :         /* If this is the top-level index, we're done */
    1281           12 :         if (!OidIsValid(parentIndexId))
    1282           12 :             pgstat_progress_end_command();
    1283              : 
    1284           12 :         return address;
    1285              :     }
    1286              : 
    1287              :     /*
    1288              :      * Roll back any GUC changes executed by index functions, and keep
    1289              :      * subsequent changes local to this command.  This is essential if some
    1290              :      * index function changed a behavior-affecting GUC, e.g. search_path.
    1291              :      */
    1292        19318 :     AtEOXact_GUC(false, root_save_nestlevel);
    1293        19318 :     root_save_nestlevel = NewGUCNestLevel();
    1294        19318 :     RestrictSearchPath();
    1295              : 
    1296              :     /* Add any requested comment */
    1297        19318 :     if (stmt->idxcomment != NULL)
    1298           52 :         CreateComments(indexRelationId, RelationRelationId, 0,
    1299           52 :                        stmt->idxcomment);
    1300              : 
    1301        19318 :     if (partitioned)
    1302              :     {
    1303              :         PartitionDesc partdesc;
    1304              : 
    1305              :         /*
    1306              :          * Unless caller specified to skip this step (via ONLY), process each
    1307              :          * partition to make sure they all contain a corresponding index.
    1308              :          *
    1309              :          * If we're called internally (no stmt->relation), recurse always.
    1310              :          */
    1311         1382 :         partdesc = RelationGetPartitionDesc(rel, true);
    1312         1382 :         if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
    1313              :         {
    1314          409 :             int         nparts = partdesc->nparts;
    1315          409 :             Oid        *part_oids = palloc_array(Oid, nparts);
    1316          409 :             bool        invalidate_parent = false;
    1317              :             Relation    parentIndex;
    1318              :             TupleDesc   parentDesc;
    1319              : 
    1320              :             /*
    1321              :              * Report the total number of partitions at the start of the
    1322              :              * command; don't update it when being called recursively.
    1323              :              */
    1324          409 :             if (!OidIsValid(parentIndexId))
    1325              :             {
    1326              :                 /*
    1327              :                  * When called by ProcessUtilitySlow, the number of partitions
    1328              :                  * is passed in as an optimization; but other callers pass -1
    1329              :                  * since they don't have the value handy.  This should count
    1330              :                  * partitions the same way, ie one less than the number of
    1331              :                  * relations find_all_inheritors reports.
    1332              :                  *
    1333              :                  * We assume we needn't ask find_all_inheritors to take locks,
    1334              :                  * because that should have happened already for all callers.
    1335              :                  * Even if it did not, this is safe as long as we don't try to
    1336              :                  * touch the partitions here; the worst consequence would be a
    1337              :                  * bogus progress-reporting total.
    1338              :                  */
    1339          335 :                 if (total_parts < 0)
    1340              :                 {
    1341           84 :                     List       *children = find_all_inheritors(tableId, NoLock, NULL);
    1342              : 
    1343           84 :                     total_parts = list_length(children) - 1;
    1344           84 :                     list_free(children);
    1345              :                 }
    1346              : 
    1347          335 :                 pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
    1348              :                                              total_parts);
    1349              :             }
    1350              : 
    1351              :             /* Make a local copy of partdesc->oids[], just for safety */
    1352          409 :             memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
    1353              : 
    1354              :             /*
    1355              :              * We'll need an IndexInfo describing the parent index.  The one
    1356              :              * built above is almost good enough, but not quite, because (for
    1357              :              * example) its predicate expression if any hasn't been through
    1358              :              * expression preprocessing.  The most reliable way to get an
    1359              :              * IndexInfo that will match those for child indexes is to build
    1360              :              * it the same way, using BuildIndexInfo().
    1361              :              */
    1362          409 :             parentIndex = index_open(indexRelationId, lockmode);
    1363          409 :             indexInfo = BuildIndexInfo(parentIndex);
    1364              : 
    1365          409 :             parentDesc = RelationGetDescr(rel);
    1366              : 
    1367              :             /*
    1368              :              * For each partition, scan all existing indexes; if one matches
    1369              :              * our index definition and is not already attached to some other
    1370              :              * parent index, attach it to the one we just created.
    1371              :              *
    1372              :              * If none matches, build a new index by calling ourselves
    1373              :              * recursively with the same options (except for the index name).
    1374              :              */
    1375         1123 :             for (int i = 0; i < nparts; i++)
    1376              :             {
    1377          730 :                 Oid         childRelid = part_oids[i];
    1378              :                 Relation    childrel;
    1379              :                 Oid         child_save_userid;
    1380              :                 int         child_save_sec_context;
    1381              :                 int         child_save_nestlevel;
    1382              :                 List       *childidxs;
    1383              :                 ListCell   *cell;
    1384              :                 AttrMap    *attmap;
    1385          730 :                 bool        found = false;
    1386              : 
    1387          730 :                 childrel = table_open(childRelid, lockmode);
    1388              : 
    1389          730 :                 GetUserIdAndSecContext(&child_save_userid,
    1390              :                                        &child_save_sec_context);
    1391          730 :                 SetUserIdAndSecContext(childrel->rd_rel->relowner,
    1392              :                                        child_save_sec_context | SECURITY_RESTRICTED_OPERATION);
    1393          730 :                 child_save_nestlevel = NewGUCNestLevel();
    1394          730 :                 RestrictSearchPath();
    1395              : 
    1396              :                 /*
    1397              :                  * Don't try to create indexes on foreign tables, though. Skip
    1398              :                  * those if a regular index, or fail if trying to create a
    1399              :                  * constraint index.
    1400              :                  */
    1401          730 :                 if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
    1402              :                 {
    1403           12 :                     if (stmt->unique || stmt->primary)
    1404            8 :                         ereport(ERROR,
    1405              :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1406              :                                  errmsg("cannot create unique index on partitioned table \"%s\"",
    1407              :                                         RelationGetRelationName(rel)),
    1408              :                                  errdetail("Table \"%s\" contains partitions that are foreign tables.",
    1409              :                                            RelationGetRelationName(rel))));
    1410              : 
    1411            4 :                     AtEOXact_GUC(false, child_save_nestlevel);
    1412            4 :                     SetUserIdAndSecContext(child_save_userid,
    1413              :                                            child_save_sec_context);
    1414            4 :                     table_close(childrel, lockmode);
    1415            4 :                     continue;
    1416              :                 }
    1417              : 
    1418          718 :                 childidxs = RelationGetIndexList(childrel);
    1419              :                 attmap =
    1420          718 :                     build_attrmap_by_name(RelationGetDescr(childrel),
    1421              :                                           parentDesc,
    1422              :                                           false);
    1423              : 
    1424          951 :                 foreach(cell, childidxs)
    1425              :                 {
    1426          285 :                     Oid         cldidxid = lfirst_oid(cell);
    1427              :                     Relation    cldidx;
    1428              :                     IndexInfo  *cldIdxInfo;
    1429              : 
    1430              :                     /* this index is already partition of another one */
    1431          285 :                     if (has_superclass(cldidxid))
    1432          217 :                         continue;
    1433              : 
    1434           68 :                     cldidx = index_open(cldidxid, lockmode);
    1435           68 :                     cldIdxInfo = BuildIndexInfo(cldidx);
    1436           68 :                     if (CompareIndexInfo(cldIdxInfo, indexInfo,
    1437           68 :                                          cldidx->rd_indcollation,
    1438           68 :                                          parentIndex->rd_indcollation,
    1439           68 :                                          cldidx->rd_opfamily,
    1440           68 :                                          parentIndex->rd_opfamily,
    1441              :                                          attmap))
    1442              :                     {
    1443           52 :                         Oid         cldConstrOid = InvalidOid;
    1444              : 
    1445              :                         /*
    1446              :                          * Found a match.
    1447              :                          *
    1448              :                          * If this index is being created in the parent
    1449              :                          * because of a constraint, then the child needs to
    1450              :                          * have a constraint also, so look for one.  If there
    1451              :                          * is no such constraint, this index is no good, so
    1452              :                          * keep looking.
    1453              :                          */
    1454           52 :                         if (createdConstraintId != InvalidOid)
    1455              :                         {
    1456              :                             cldConstrOid =
    1457            8 :                                 get_relation_idx_constraint_oid(childRelid,
    1458              :                                                                 cldidxid);
    1459            8 :                             if (cldConstrOid == InvalidOid)
    1460              :                             {
    1461            0 :                                 index_close(cldidx, lockmode);
    1462            0 :                                 continue;
    1463              :                             }
    1464              :                         }
    1465              : 
    1466              :                         /* Attach index to parent and we're done. */
    1467           52 :                         IndexSetParentIndex(cldidx, indexRelationId);
    1468           52 :                         if (createdConstraintId != InvalidOid)
    1469            8 :                             ConstraintSetParentConstraint(cldConstrOid,
    1470              :                                                           createdConstraintId,
    1471              :                                                           childRelid);
    1472              : 
    1473           52 :                         if (!cldidx->rd_index->indisvalid)
    1474           12 :                             invalidate_parent = true;
    1475              : 
    1476           52 :                         found = true;
    1477              : 
    1478              :                         /*
    1479              :                          * Report this partition as processed.  Note that if
    1480              :                          * the partition has children itself, we'd ideally
    1481              :                          * count the children and update the progress report
    1482              :                          * for all of them; but that seems unduly expensive.
    1483              :                          * Instead, the progress report will act like all such
    1484              :                          * indirect children were processed in zero time at
    1485              :                          * the end of the command.
    1486              :                          */
    1487           52 :                         pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1488              : 
    1489              :                         /* keep lock till commit */
    1490           52 :                         index_close(cldidx, NoLock);
    1491           52 :                         break;
    1492              :                     }
    1493              : 
    1494           16 :                     index_close(cldidx, lockmode);
    1495              :                 }
    1496              : 
    1497          718 :                 list_free(childidxs);
    1498          718 :                 AtEOXact_GUC(false, child_save_nestlevel);
    1499          718 :                 SetUserIdAndSecContext(child_save_userid,
    1500              :                                        child_save_sec_context);
    1501          718 :                 table_close(childrel, NoLock);
    1502              : 
    1503              :                 /*
    1504              :                  * If no matching index was found, create our own.
    1505              :                  */
    1506          718 :                 if (!found)
    1507              :                 {
    1508              :                     IndexStmt  *childStmt;
    1509              :                     ObjectAddress childAddr;
    1510              : 
    1511              :                     /*
    1512              :                      * Build an IndexStmt describing the desired child index
    1513              :                      * in the same way that we do during ATTACH PARTITION.
    1514              :                      * Notably, we rely on generateClonedIndexStmt to produce
    1515              :                      * a search-path-independent representation, which the
    1516              :                      * original IndexStmt might not be.
    1517              :                      */
    1518          666 :                     childStmt = generateClonedIndexStmt(NULL,
    1519              :                                                         parentIndex,
    1520              :                                                         attmap,
    1521              :                                                         NULL);
    1522              : 
    1523              :                     /*
    1524              :                      * Recurse as the starting user ID.  Callee will use that
    1525              :                      * for permission checks, then switch again.
    1526              :                      */
    1527              :                     Assert(GetUserId() == child_save_userid);
    1528          666 :                     SetUserIdAndSecContext(root_save_userid,
    1529              :                                            root_save_sec_context);
    1530              :                     childAddr =
    1531          666 :                         DefineIndex(NULL,   /* original pstate not applicable */
    1532              :                                     childRelid, childStmt,
    1533              :                                     InvalidOid, /* no predefined OID */
    1534              :                                     indexRelationId,    /* this is our child */
    1535              :                                     createdConstraintId,
    1536              :                                     -1,
    1537              :                                     is_alter_table, check_rights,
    1538              :                                     check_not_in_use,
    1539              :                                     skip_build, quiet);
    1540          658 :                     SetUserIdAndSecContext(child_save_userid,
    1541              :                                            child_save_sec_context);
    1542              : 
    1543              :                     /*
    1544              :                      * Check if the index just created is valid or not, as it
    1545              :                      * could be possible that it has been switched as invalid
    1546              :                      * when recursing across multiple partition levels.
    1547              :                      */
    1548          658 :                     if (!get_index_isvalid(childAddr.objectId))
    1549            4 :                         invalidate_parent = true;
    1550              :                 }
    1551              : 
    1552          710 :                 free_attrmap(attmap);
    1553              :             }
    1554              : 
    1555          393 :             index_close(parentIndex, lockmode);
    1556              : 
    1557              :             /*
    1558              :              * The pg_index row we inserted for this index was marked
    1559              :              * indisvalid=true.  But if we attached an existing index that is
    1560              :              * invalid, this is incorrect, so update our row to invalid too.
    1561              :              */
    1562          393 :             if (invalidate_parent)
    1563              :             {
    1564           16 :                 Relation    pg_index = table_open(IndexRelationId, RowExclusiveLock);
    1565              :                 HeapTuple   tup,
    1566              :                             newtup;
    1567              : 
    1568           16 :                 tup = SearchSysCache1(INDEXRELID,
    1569              :                                       ObjectIdGetDatum(indexRelationId));
    1570           16 :                 if (!HeapTupleIsValid(tup))
    1571            0 :                     elog(ERROR, "cache lookup failed for index %u",
    1572              :                          indexRelationId);
    1573           16 :                 newtup = heap_copytuple(tup);
    1574           16 :                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
    1575           16 :                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
    1576           16 :                 ReleaseSysCache(tup);
    1577           16 :                 table_close(pg_index, RowExclusiveLock);
    1578           16 :                 heap_freetuple(newtup);
    1579              : 
    1580              :                 /*
    1581              :                  * CCI here to make this update visible, in case this recurses
    1582              :                  * across multiple partition levels.
    1583              :                  */
    1584           16 :                 CommandCounterIncrement();
    1585              :             }
    1586              :         }
    1587              : 
    1588              :         /*
    1589              :          * Indexes on partitioned tables are not themselves built, so we're
    1590              :          * done here.
    1591              :          */
    1592         1366 :         AtEOXact_GUC(false, root_save_nestlevel);
    1593         1366 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1594         1366 :         table_close(rel, NoLock);
    1595         1366 :         if (!OidIsValid(parentIndexId))
    1596         1167 :             pgstat_progress_end_command();
    1597              :         else
    1598              :         {
    1599              :             /* Update progress for an intermediate partitioned index itself */
    1600          199 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1601              :         }
    1602              : 
    1603         1366 :         return address;
    1604              :     }
    1605              : 
    1606        17936 :     AtEOXact_GUC(false, root_save_nestlevel);
    1607        17936 :     SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1608              : 
    1609        17936 :     if (!concurrent)
    1610              :     {
    1611              :         /* Close the heap and we're done, in the non-concurrent case */
    1612        17829 :         table_close(rel, NoLock);
    1613              : 
    1614              :         /*
    1615              :          * If this is the top-level index, the command is done overall;
    1616              :          * otherwise, increment progress to report one child index is done.
    1617              :          */
    1618        17829 :         if (!OidIsValid(parentIndexId))
    1619        15988 :             pgstat_progress_end_command();
    1620              :         else
    1621         1841 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1622              : 
    1623        17829 :         return address;
    1624              :     }
    1625              : 
    1626              :     /* save lockrelid and locktag for below, then close rel */
    1627          107 :     heaprelid = rel->rd_lockInfo.lockRelId;
    1628          107 :     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
    1629          107 :     table_close(rel, NoLock);
    1630              : 
    1631              :     /*
    1632              :      * For a concurrent build, it's important to make the catalog entries
    1633              :      * visible to other transactions before we start to build the index. That
    1634              :      * will prevent them from making incompatible HOT updates.  The new index
    1635              :      * will be marked not indisready and not indisvalid, so that no one else
    1636              :      * tries to either insert into it or use it for queries.
    1637              :      *
    1638              :      * We must commit our current transaction so that the index becomes
    1639              :      * visible; then start another.  Note that all the data structures we just
    1640              :      * built are lost in the commit.  The only data we keep past here are the
    1641              :      * relation IDs.
    1642              :      *
    1643              :      * Before committing, get a session-level lock on the table, to ensure
    1644              :      * that neither it nor the index can be dropped before we finish. This
    1645              :      * cannot block, even if someone else is waiting for access, because we
    1646              :      * already have the same lock within our transaction.
    1647              :      *
    1648              :      * Note: we don't currently bother with a session lock on the index,
    1649              :      * because there are no operations that could change its state while we
    1650              :      * hold lock on the parent table.  This might need to change later.
    1651              :      */
    1652          107 :     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1653              : 
    1654          107 :     PopActiveSnapshot();
    1655          107 :     CommitTransactionCommand();
    1656          107 :     StartTransactionCommand();
    1657              : 
    1658              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1659          107 :     if (safe_index)
    1660           77 :         set_indexsafe_procflags();
    1661              : 
    1662              :     /*
    1663              :      * The index is now visible, so we can report the OID.  While on it,
    1664              :      * include the report for the beginning of phase 2.
    1665              :      */
    1666              :     {
    1667          107 :         const int   progress_cols[] = {
    1668              :             PROGRESS_CREATEIDX_INDEX_OID,
    1669              :             PROGRESS_CREATEIDX_PHASE
    1670              :         };
    1671          107 :         const int64 progress_vals[] = {
    1672              :             indexRelationId,
    1673              :             PROGRESS_CREATEIDX_PHASE_WAIT_1
    1674              :         };
    1675              : 
    1676          107 :         pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
    1677              :     }
    1678              : 
    1679              :     /*
    1680              :      * Phase 2 of concurrent index build (see comments for validate_index()
    1681              :      * for an overview of how this works)
    1682              :      *
    1683              :      * Now we must wait until no running transaction could have the table open
    1684              :      * with the old list of indexes.  Use ShareLock to consider running
    1685              :      * transactions that hold locks that permit writing to the table.  Note we
    1686              :      * do not need to worry about xacts that open the table for writing after
    1687              :      * this point; they will see the new index when they open it.
    1688              :      *
    1689              :      * Note: the reason we use actual lock acquisition here, rather than just
    1690              :      * checking the ProcArray and sleeping, is that deadlock is possible if
    1691              :      * one of the transactions in question is blocked trying to acquire an
    1692              :      * exclusive lock on our table.  The lock code will detect deadlock and
    1693              :      * error out properly.
    1694              :      */
    1695          107 :     WaitForLockers(heaplocktag, ShareLock, true);
    1696              : 
    1697              :     /*
    1698              :      * At this moment we are sure that there are no transactions with the
    1699              :      * table open for write that don't have this new index in their list of
    1700              :      * indexes.  We have waited out all the existing transactions and any new
    1701              :      * transaction will have the new index in its list, but the index is still
    1702              :      * marked as "not-ready-for-inserts".  The index is consulted while
    1703              :      * deciding HOT-safety though.  This arrangement ensures that no new HOT
    1704              :      * chains can be created where the new tuple and the old tuple in the
    1705              :      * chain have different index keys.
    1706              :      *
    1707              :      * We now take a new snapshot, and build the index using all tuples that
    1708              :      * are visible in this snapshot.  We can be sure that any HOT updates to
    1709              :      * these tuples will be compatible with the index, since any updates made
    1710              :      * by transactions that didn't know about the index are now committed or
    1711              :      * rolled back.  Thus, each visible tuple is either the end of its
    1712              :      * HOT-chain or the extension of the chain is HOT-safe for this index.
    1713              :      */
    1714              : 
    1715              :     /* Set ActiveSnapshot since functions in the indexes may need it */
    1716          107 :     PushActiveSnapshot(GetTransactionSnapshot());
    1717              : 
    1718              :     /* Perform concurrent build of index */
    1719          107 :     index_concurrently_build(tableId, indexRelationId);
    1720              : 
    1721              :     /* we can do away with our snapshot */
    1722           95 :     PopActiveSnapshot();
    1723              : 
    1724              :     /*
    1725              :      * Commit this transaction to make the indisready update visible.
    1726              :      */
    1727           95 :     CommitTransactionCommand();
    1728           95 :     StartTransactionCommand();
    1729              : 
    1730              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1731           95 :     if (safe_index)
    1732           69 :         set_indexsafe_procflags();
    1733              : 
    1734              :     /*
    1735              :      * Phase 3 of concurrent index build
    1736              :      *
    1737              :      * We once again wait until no transaction can have the table open with
    1738              :      * the index marked as read-only for updates.
    1739              :      */
    1740           95 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1741              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    1742           95 :     WaitForLockers(heaplocktag, ShareLock, true);
    1743              : 
    1744              :     /*
    1745              :      * Now take the "reference snapshot" that will be used by validate_index()
    1746              :      * to filter candidate tuples.  Beware!  There might still be snapshots in
    1747              :      * use that treat some transaction as in-progress that our reference
    1748              :      * snapshot treats as committed.  If such a recently-committed transaction
    1749              :      * deleted tuples in the table, we will not include them in the index; yet
    1750              :      * those transactions which see the deleting one as still-in-progress will
    1751              :      * expect such tuples to be there once we mark the index as valid.
    1752              :      *
    1753              :      * We solve this by waiting for all endangered transactions to exit before
    1754              :      * we mark the index as valid.
    1755              :      *
    1756              :      * We also set ActiveSnapshot to this snap, since functions in indexes may
    1757              :      * need a snapshot.
    1758              :      */
    1759           95 :     snapshot = RegisterSnapshot(GetTransactionSnapshot());
    1760           95 :     PushActiveSnapshot(snapshot);
    1761              : 
    1762              :     /*
    1763              :      * Scan the index and the heap, insert any missing index entries.
    1764              :      */
    1765           95 :     validate_index(tableId, indexRelationId, snapshot);
    1766              : 
    1767              :     /*
    1768              :      * Drop the reference snapshot.  We must do this before waiting out other
    1769              :      * snapshot holders, else we will deadlock against other processes also
    1770              :      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
    1771              :      * they must wait for.  But first, save the snapshot's xmin to use as
    1772              :      * limitXmin for GetCurrentVirtualXIDs().
    1773              :      */
    1774           95 :     limitXmin = snapshot->xmin;
    1775              : 
    1776           95 :     PopActiveSnapshot();
    1777           95 :     UnregisterSnapshot(snapshot);
    1778              : 
    1779              :     /*
    1780              :      * The snapshot subsystem could still contain registered snapshots that
    1781              :      * are holding back our process's advertised xmin; in particular, if
    1782              :      * default_transaction_isolation = serializable, there is a transaction
    1783              :      * snapshot that is still active.  The CatalogSnapshot is likewise a
    1784              :      * hazard.  To ensure no deadlocks, we must commit and start yet another
    1785              :      * transaction, and do our wait before any snapshot has been taken in it.
    1786              :      */
    1787           95 :     CommitTransactionCommand();
    1788           95 :     StartTransactionCommand();
    1789              : 
    1790              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1791           95 :     if (safe_index)
    1792           69 :         set_indexsafe_procflags();
    1793              : 
    1794              :     /* We should now definitely not be advertising any xmin. */
    1795              :     Assert(MyProc->xmin == InvalidTransactionId);
    1796              : 
    1797              :     /*
    1798              :      * The index is now valid in the sense that it contains all currently
    1799              :      * interesting tuples.  But since it might not contain tuples deleted just
    1800              :      * before the reference snap was taken, we have to wait out any
    1801              :      * transactions that might have older snapshots.
    1802              :      */
    1803           95 :     INJECTION_POINT("define-index-before-set-valid", NULL);
    1804           95 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1805              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_3);
    1806           95 :     WaitForOlderSnapshots(limitXmin, true);
    1807              : 
    1808              :     /*
    1809              :      * Updating pg_index might involve TOAST table access, so ensure we have a
    1810              :      * valid snapshot.
    1811              :      */
    1812           95 :     PushActiveSnapshot(GetTransactionSnapshot());
    1813              : 
    1814              :     /*
    1815              :      * Index can now be marked valid -- update its pg_index entry
    1816              :      */
    1817           95 :     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
    1818              : 
    1819           95 :     PopActiveSnapshot();
    1820              : 
    1821              :     /*
    1822              :      * The pg_index update will cause backends (including this one) to update
    1823              :      * relcache entries for the index itself, but we should also send a
    1824              :      * relcache inval on the parent table to force replanning of cached plans.
    1825              :      * Otherwise existing sessions might fail to use the new index where it
    1826              :      * would be useful.  (Note that our earlier commits did not create reasons
    1827              :      * to replan; so relcache flush on the index itself was sufficient.)
    1828              :      */
    1829           95 :     CacheInvalidateRelcacheByRelid(heaprelid.relId);
    1830              : 
    1831              :     /*
    1832              :      * Last thing to do is release the session-level lock on the parent table.
    1833              :      */
    1834           95 :     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1835              : 
    1836           95 :     pgstat_progress_end_command();
    1837              : 
    1838           95 :     return address;
    1839              : }
    1840              : 
    1841              : 
    1842              : /*
    1843              :  * CheckPredicate
    1844              :  *      Checks that the given partial-index predicate is valid.
    1845              :  *
    1846              :  * This used to also constrain the form of the predicate to forms that
    1847              :  * indxpath.c could do something with.  However, that seems overly
    1848              :  * restrictive.  One useful application of partial indexes is to apply
    1849              :  * a UNIQUE constraint across a subset of a table, and in that scenario
    1850              :  * any evaluable predicate will work.  So accept any predicate here
    1851              :  * (except ones requiring a plan), and let indxpath.c fend for itself.
    1852              :  */
    1853              : static void
    1854          281 : CheckPredicate(Expr *predicate)
    1855              : {
    1856              :     /*
    1857              :      * transformExpr() should have already rejected subqueries, aggregates,
    1858              :      * and window functions, based on the EXPR_KIND_ for a predicate.
    1859              :      */
    1860              : 
    1861              :     /*
    1862              :      * A predicate using mutable functions is probably wrong, for the same
    1863              :      * reasons that we don't allow an index expression to use one.
    1864              :      */
    1865          281 :     if (contain_mutable_functions_after_planning(predicate))
    1866            0 :         ereport(ERROR,
    1867              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1868              :                  errmsg("functions in index predicate must be marked IMMUTABLE")));
    1869          281 : }
    1870              : 
    1871              : /*
    1872              :  * Compute per-index-column information, including indexed column numbers
    1873              :  * or index expressions, opclasses and their options. Note, all output vectors
    1874              :  * should be allocated for all columns, including "including" ones.
    1875              :  *
    1876              :  * If the caller switched to the table owner, ddl_userid is the role for ACL
    1877              :  * checks reached without traversing opaque expressions.  Otherwise, it's
    1878              :  * InvalidOid, and other ddl_* arguments are undefined.
    1879              :  */
    1880              : static void
    1881        19887 : ComputeIndexAttrs(ParseState *pstate,
    1882              :                   IndexInfo *indexInfo,
    1883              :                   Oid *typeOids,
    1884              :                   Oid *collationOids,
    1885              :                   Oid *opclassOids,
    1886              :                   Datum *opclassOptions,
    1887              :                   int16 *colOptions,
    1888              :                   const List *attList,  /* list of IndexElem's */
    1889              :                   const List *exclusionOpNames,
    1890              :                   Oid relId,
    1891              :                   const char *accessMethodName,
    1892              :                   Oid accessMethodId,
    1893              :                   bool amcanorder,
    1894              :                   bool isconstraint,
    1895              :                   bool iswithoutoverlaps,
    1896              :                   Oid ddl_userid,
    1897              :                   int ddl_sec_context,
    1898              :                   int *ddl_save_nestlevel)
    1899              : {
    1900              :     ListCell   *nextExclOp;
    1901              :     ListCell   *lc;
    1902              :     int         attn;
    1903        19887 :     int         nkeycols = indexInfo->ii_NumIndexKeyAttrs;
    1904              :     Oid         save_userid;
    1905              :     int         save_sec_context;
    1906              : 
    1907              :     /* Allocate space for exclusion operator info, if needed */
    1908        19887 :     if (exclusionOpNames)
    1909              :     {
    1910              :         Assert(list_length(exclusionOpNames) == nkeycols);
    1911          228 :         indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1912          228 :         indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1913          228 :         indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1914          228 :         nextExclOp = list_head(exclusionOpNames);
    1915              :     }
    1916              :     else
    1917        19659 :         nextExclOp = NULL;
    1918              : 
    1919              :     /*
    1920              :      * If this is a WITHOUT OVERLAPS constraint, we need space for exclusion
    1921              :      * ops, but we don't need to parse anything, so we can let nextExclOp be
    1922              :      * NULL. Note that for partitions/inheriting/LIKE, exclusionOpNames will
    1923              :      * be set, so we already allocated above.
    1924              :      */
    1925        19887 :     if (iswithoutoverlaps)
    1926              :     {
    1927          579 :         if (exclusionOpNames == NIL)
    1928              :         {
    1929          507 :             indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1930          507 :             indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1931          507 :             indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1932              :         }
    1933          579 :         nextExclOp = NULL;
    1934              :     }
    1935              : 
    1936        19887 :     if (OidIsValid(ddl_userid))
    1937        19814 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    1938              : 
    1939              :     /*
    1940              :      * process attributeList
    1941              :      */
    1942        19887 :     attn = 0;
    1943        47503 :     foreach(lc, attList)
    1944              :     {
    1945        27833 :         IndexElem  *attribute = (IndexElem *) lfirst(lc);
    1946              :         Oid         atttype;
    1947              :         Oid         attcollation;
    1948              : 
    1949              :         /*
    1950              :          * Process the column-or-expression to be indexed.
    1951              :          */
    1952        27833 :         if (attribute->name != NULL)
    1953              :         {
    1954              :             /* Simple index attribute */
    1955              :             HeapTuple   atttuple;
    1956              :             Form_pg_attribute attform;
    1957              : 
    1958              :             Assert(attribute->expr == NULL);
    1959        26949 :             atttuple = SearchSysCacheAttName(relId, attribute->name);
    1960        26949 :             if (!HeapTupleIsValid(atttuple))
    1961              :             {
    1962              :                 /* difference in error message spellings is historical */
    1963           20 :                 if (isconstraint)
    1964           12 :                     ereport(ERROR,
    1965              :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1966              :                              errmsg("column \"%s\" named in key does not exist",
    1967              :                                     attribute->name),
    1968              :                              parser_errposition(pstate, attribute->location)));
    1969              :                 else
    1970            8 :                     ereport(ERROR,
    1971              :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1972              :                              errmsg("column \"%s\" does not exist",
    1973              :                                     attribute->name),
    1974              :                              parser_errposition(pstate, attribute->location)));
    1975              :             }
    1976        26929 :             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
    1977        26929 :             indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
    1978        26929 :             atttype = attform->atttypid;
    1979        26929 :             attcollation = attform->attcollation;
    1980        26929 :             ReleaseSysCache(atttuple);
    1981              :         }
    1982              :         else
    1983              :         {
    1984              :             /* Index expression */
    1985          884 :             Node       *expr = attribute->expr;
    1986              : 
    1987              :             Assert(expr != NULL);
    1988              : 
    1989          884 :             if (attn >= nkeycols)
    1990            0 :                 ereport(ERROR,
    1991              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1992              :                          errmsg("expressions are not supported in included columns"),
    1993              :                          parser_errposition(pstate, attribute->location)));
    1994          884 :             atttype = exprType(expr);
    1995          884 :             attcollation = exprCollation(expr);
    1996              : 
    1997              :             /*
    1998              :              * Strip any top-level COLLATE clause.  This ensures that we treat
    1999              :              * "x COLLATE y" and "(x COLLATE y)" alike.
    2000              :              */
    2001          904 :             while (IsA(expr, CollateExpr))
    2002           20 :                 expr = (Node *) ((CollateExpr *) expr)->arg;
    2003              : 
    2004          884 :             if (IsA(expr, Var) &&
    2005            8 :                 ((Var *) expr)->varattno != InvalidAttrNumber)
    2006              :             {
    2007              :                 /*
    2008              :                  * User wrote "(column)" or "(column COLLATE something)".
    2009              :                  * Treat it like simple attribute anyway.
    2010              :                  */
    2011            8 :                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
    2012              :             }
    2013              :             else
    2014              :             {
    2015          876 :                 indexInfo->ii_IndexAttrNumbers[attn] = 0;    /* marks expression */
    2016          876 :                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
    2017              :                                                     expr);
    2018              : 
    2019              :                 /*
    2020              :                  * transformExpr() should have already rejected subqueries,
    2021              :                  * aggregates, and window functions, based on the EXPR_KIND_
    2022              :                  * for an index expression.
    2023              :                  */
    2024              : 
    2025              :                 /*
    2026              :                  * An expression using mutable functions is probably wrong,
    2027              :                  * since if you aren't going to get the same result for the
    2028              :                  * same data every time, it's not clear what the index entries
    2029              :                  * mean at all.
    2030              :                  */
    2031          876 :                 if (contain_mutable_functions_after_planning((Expr *) expr))
    2032          184 :                     ereport(ERROR,
    2033              :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2034              :                              errmsg("functions in index expression must be marked IMMUTABLE"),
    2035              :                              parser_errposition(pstate, attribute->location)));
    2036              :             }
    2037              :         }
    2038              : 
    2039        27629 :         typeOids[attn] = atttype;
    2040              : 
    2041              :         /*
    2042              :          * Included columns have no collation, no opclass and no ordering
    2043              :          * options.
    2044              :          */
    2045        27629 :         if (attn >= nkeycols)
    2046              :         {
    2047          410 :             if (attribute->collation)
    2048            0 :                 ereport(ERROR,
    2049              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2050              :                          errmsg("including column does not support a collation"),
    2051              :                          parser_errposition(pstate, attribute->location)));
    2052          410 :             if (attribute->opclass)
    2053            0 :                 ereport(ERROR,
    2054              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2055              :                          errmsg("including column does not support an operator class"),
    2056              :                          parser_errposition(pstate, attribute->location)));
    2057          410 :             if (attribute->ordering != SORTBY_DEFAULT)
    2058            0 :                 ereport(ERROR,
    2059              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2060              :                          errmsg("including column does not support ASC/DESC options"),
    2061              :                          parser_errposition(pstate, attribute->location)));
    2062          410 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2063            0 :                 ereport(ERROR,
    2064              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2065              :                          errmsg("including column does not support NULLS FIRST/LAST options"),
    2066              :                          parser_errposition(pstate, attribute->location)));
    2067              : 
    2068          410 :             opclassOids[attn] = InvalidOid;
    2069          410 :             opclassOptions[attn] = (Datum) 0;
    2070          410 :             colOptions[attn] = 0;
    2071          410 :             collationOids[attn] = InvalidOid;
    2072          410 :             attn++;
    2073              : 
    2074          410 :             continue;
    2075              :         }
    2076              : 
    2077              :         /*
    2078              :          * Apply collation override if any.  Use of ddl_userid is necessary
    2079              :          * due to ACL checks therein, and it's safe because collations don't
    2080              :          * contain opaque expressions (or non-opaque expressions).
    2081              :          */
    2082        27219 :         if (attribute->collation)
    2083              :         {
    2084           76 :             if (OidIsValid(ddl_userid))
    2085              :             {
    2086           76 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2087           76 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2088              :             }
    2089           76 :             attcollation = get_collation_oid(attribute->collation, false);
    2090           75 :             if (OidIsValid(ddl_userid))
    2091              :             {
    2092           75 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2093           75 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2094           75 :                 RestrictSearchPath();
    2095              :             }
    2096              :         }
    2097              : 
    2098              :         /*
    2099              :          * Check we have a collation iff it's a collatable type.  The only
    2100              :          * expected failures here are (1) COLLATE applied to a noncollatable
    2101              :          * type, or (2) index expression had an unresolved collation.  But we
    2102              :          * might as well code this to be a complete consistency check.
    2103              :          */
    2104        27218 :         if (type_is_collatable(atttype))
    2105              :         {
    2106         4068 :             if (!OidIsValid(attcollation))
    2107            0 :                 ereport(ERROR,
    2108              :                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
    2109              :                          errmsg("could not determine which collation to use for index expression"),
    2110              :                          errhint("Use the COLLATE clause to set the collation explicitly."),
    2111              :                          parser_errposition(pstate, attribute->location)));
    2112              :         }
    2113              :         else
    2114              :         {
    2115        23150 :             if (OidIsValid(attcollation))
    2116            8 :                 ereport(ERROR,
    2117              :                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    2118              :                          errmsg("collations are not supported by type %s",
    2119              :                                 format_type_be(atttype)),
    2120              :                          parser_errposition(pstate, attribute->location)));
    2121              :         }
    2122              : 
    2123        27210 :         collationOids[attn] = attcollation;
    2124              : 
    2125              :         /*
    2126              :          * Identify the opclass to use.  Use of ddl_userid is necessary due to
    2127              :          * ACL checks therein.  This is safe despite opclasses containing
    2128              :          * opaque expressions (specifically, functions), because only
    2129              :          * superusers can define opclasses.
    2130              :          */
    2131        27210 :         if (OidIsValid(ddl_userid))
    2132              :         {
    2133        27133 :             AtEOXact_GUC(false, *ddl_save_nestlevel);
    2134        27133 :             SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2135              :         }
    2136        27210 :         opclassOids[attn] = ResolveOpClass(attribute->opclass,
    2137              :                                            atttype,
    2138              :                                            accessMethodName,
    2139              :                                            accessMethodId);
    2140        27206 :         if (OidIsValid(ddl_userid))
    2141              :         {
    2142        27129 :             SetUserIdAndSecContext(save_userid, save_sec_context);
    2143        27129 :             *ddl_save_nestlevel = NewGUCNestLevel();
    2144        27129 :             RestrictSearchPath();
    2145              :         }
    2146              : 
    2147              :         /*
    2148              :          * Identify the exclusion operator, if any.
    2149              :          */
    2150        27206 :         if (nextExclOp)
    2151              :         {
    2152          226 :             List       *opname = (List *) lfirst(nextExclOp);
    2153              :             Oid         opid;
    2154              :             Oid         opfamily;
    2155              :             int         strat;
    2156              : 
    2157              :             /*
    2158              :              * Find the operator --- it must accept the column datatype
    2159              :              * without runtime coercion (but binary compatibility is OK).
    2160              :              * Operators contain opaque expressions (specifically, functions).
    2161              :              * compatible_oper_opid() boils down to oper() and
    2162              :              * IsBinaryCoercible().  PostgreSQL would have security problems
    2163              :              * elsewhere if oper() started calling opaque expressions.
    2164              :              */
    2165          226 :             if (OidIsValid(ddl_userid))
    2166              :             {
    2167          226 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2168          226 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2169              :             }
    2170          226 :             opid = compatible_oper_opid(opname, atttype, atttype, false);
    2171          226 :             if (OidIsValid(ddl_userid))
    2172              :             {
    2173          226 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2174          226 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2175          226 :                 RestrictSearchPath();
    2176              :             }
    2177              : 
    2178              :             /*
    2179              :              * Only allow commutative operators to be used in exclusion
    2180              :              * constraints. If X conflicts with Y, but Y does not conflict
    2181              :              * with X, bad things will happen.
    2182              :              */
    2183          226 :             if (get_commutator(opid) != opid)
    2184            0 :                 ereport(ERROR,
    2185              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2186              :                          errmsg("operator %s is not commutative",
    2187              :                                 format_operator(opid)),
    2188              :                          errdetail("Only commutative operators can be used in exclusion constraints."),
    2189              :                          parser_errposition(pstate, attribute->location)));
    2190              : 
    2191              :             /*
    2192              :              * Operator must be a member of the right opfamily, too
    2193              :              */
    2194          226 :             opfamily = get_opclass_family(opclassOids[attn]);
    2195          226 :             strat = get_op_opfamily_strategy(opid, opfamily);
    2196          226 :             if (strat == 0)
    2197            0 :                 ereport(ERROR,
    2198              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2199              :                          errmsg("operator %s is not a member of operator family \"%s\"",
    2200              :                                 format_operator(opid),
    2201              :                                 get_opfamily_name(opfamily, false)),
    2202              :                          errdetail("The exclusion operator must be related to the index operator class for the constraint."),
    2203              :                          parser_errposition(pstate, attribute->location)));
    2204              : 
    2205          226 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2206          226 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2207          226 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2208          226 :             nextExclOp = lnext(exclusionOpNames, nextExclOp);
    2209              :         }
    2210        26980 :         else if (iswithoutoverlaps)
    2211              :         {
    2212              :             CompareType cmptype;
    2213              :             StrategyNumber strat;
    2214              :             Oid         opid;
    2215              : 
    2216         1180 :             if (attn == nkeycols - 1)
    2217          579 :                 cmptype = COMPARE_OVERLAP;
    2218              :             else
    2219          601 :                 cmptype = COMPARE_EQ;
    2220         1180 :             GetOperatorFromCompareType(opclassOids[attn], InvalidOid, cmptype, &opid, &strat);
    2221         1180 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2222         1180 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2223         1180 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2224              :         }
    2225              : 
    2226              :         /*
    2227              :          * Set up the per-column options (indoption field).  For now, this is
    2228              :          * zero for any un-ordered index, while ordered indexes have DESC and
    2229              :          * NULLS FIRST/LAST options.
    2230              :          */
    2231        27206 :         colOptions[attn] = 0;
    2232        27206 :         if (amcanorder)
    2233              :         {
    2234              :             /* default ordering is ASC */
    2235        24278 :             if (attribute->ordering == SORTBY_DESC)
    2236           28 :                 colOptions[attn] |= INDOPTION_DESC;
    2237              :             /* default null ordering is LAST for ASC, FIRST for DESC */
    2238        24278 :             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
    2239              :             {
    2240        24258 :                 if (attribute->ordering == SORTBY_DESC)
    2241           20 :                     colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2242              :             }
    2243           20 :             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
    2244            8 :                 colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2245              :         }
    2246              :         else
    2247              :         {
    2248              :             /* index AM does not support ordering */
    2249         2928 :             if (attribute->ordering != SORTBY_DEFAULT)
    2250            0 :                 ereport(ERROR,
    2251              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2252              :                          errmsg("access method \"%s\" does not support ASC/DESC options",
    2253              :                                 accessMethodName),
    2254              :                          parser_errposition(pstate, attribute->location)));
    2255         2928 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2256            0 :                 ereport(ERROR,
    2257              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2258              :                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
    2259              :                                 accessMethodName),
    2260              :                          parser_errposition(pstate, attribute->location)));
    2261              :         }
    2262              : 
    2263              :         /* Set up the per-column opclass options (attoptions field). */
    2264        27206 :         if (attribute->opclassopts)
    2265              :         {
    2266              :             Assert(attn < nkeycols);
    2267              : 
    2268           88 :             opclassOptions[attn] =
    2269           88 :                 transformRelOptions((Datum) 0, attribute->opclassopts,
    2270              :                                     NULL, NULL, false, false);
    2271              :         }
    2272              :         else
    2273        27118 :             opclassOptions[attn] = (Datum) 0;
    2274              : 
    2275        27206 :         attn++;
    2276              :     }
    2277        19670 : }
    2278              : 
    2279              : /*
    2280              :  * Resolve possibly-defaulted operator class specification
    2281              :  *
    2282              :  * Note: This is used to resolve operator class specifications in index and
    2283              :  * partition key definitions.
    2284              :  */
    2285              : Oid
    2286        27302 : ResolveOpClass(const List *opclass, Oid attrType,
    2287              :                const char *accessMethodName, Oid accessMethodId)
    2288              : {
    2289              :     char       *schemaname;
    2290              :     char       *opcname;
    2291              :     HeapTuple   tuple;
    2292              :     Form_pg_opclass opform;
    2293              :     Oid         opClassId,
    2294              :                 opInputType;
    2295              : 
    2296        27302 :     if (opclass == NIL)
    2297              :     {
    2298              :         /* no operator class specified, so find the default */
    2299        13885 :         opClassId = GetDefaultOpClass(attrType, accessMethodId);
    2300        13885 :         if (!OidIsValid(opClassId))
    2301            4 :             ereport(ERROR,
    2302              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2303              :                      errmsg("data type %s has no default operator class for access method \"%s\"",
    2304              :                             format_type_be(attrType), accessMethodName),
    2305              :                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
    2306        13881 :         return opClassId;
    2307              :     }
    2308              : 
    2309              :     /*
    2310              :      * Specific opclass name given, so look up the opclass.
    2311              :      */
    2312              : 
    2313              :     /* deconstruct the name list */
    2314        13417 :     DeconstructQualifiedName(opclass, &schemaname, &opcname);
    2315              : 
    2316        13417 :     if (schemaname)
    2317              :     {
    2318              :         /* Look in specific schema only */
    2319              :         Oid         namespaceId;
    2320              : 
    2321           18 :         namespaceId = LookupExplicitNamespace(schemaname, false);
    2322           18 :         tuple = SearchSysCache3(CLAAMNAMENSP,
    2323              :                                 ObjectIdGetDatum(accessMethodId),
    2324              :                                 PointerGetDatum(opcname),
    2325              :                                 ObjectIdGetDatum(namespaceId));
    2326              :     }
    2327              :     else
    2328              :     {
    2329              :         /* Unqualified opclass name, so search the search path */
    2330        13399 :         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
    2331        13399 :         if (!OidIsValid(opClassId))
    2332            8 :             ereport(ERROR,
    2333              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2334              :                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2335              :                             opcname, accessMethodName)));
    2336        13391 :         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
    2337              :     }
    2338              : 
    2339        13409 :     if (!HeapTupleIsValid(tuple))
    2340            0 :         ereport(ERROR,
    2341              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2342              :                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2343              :                         NameListToString(opclass), accessMethodName)));
    2344              : 
    2345              :     /*
    2346              :      * Verify that the index operator class accepts this datatype.  Note we
    2347              :      * will accept binary compatibility.
    2348              :      */
    2349        13409 :     opform = (Form_pg_opclass) GETSTRUCT(tuple);
    2350        13409 :     opClassId = opform->oid;
    2351        13409 :     opInputType = opform->opcintype;
    2352              : 
    2353        13409 :     if (!IsBinaryCoercible(attrType, opInputType))
    2354            0 :         ereport(ERROR,
    2355              :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
    2356              :                  errmsg("operator class \"%s\" does not accept data type %s",
    2357              :                         NameListToString(opclass), format_type_be(attrType))));
    2358              : 
    2359        13409 :     ReleaseSysCache(tuple);
    2360              : 
    2361        13409 :     return opClassId;
    2362              : }
    2363              : 
    2364              : /*
    2365              :  * GetDefaultOpClass
    2366              :  *
    2367              :  * Given the OIDs of a datatype and an access method, find the default
    2368              :  * operator class, if any.  Returns InvalidOid if there is none.
    2369              :  */
    2370              : Oid
    2371        70686 : GetDefaultOpClass(Oid type_id, Oid am_id)
    2372              : {
    2373        70686 :     Oid         result = InvalidOid;
    2374        70686 :     int         nexact = 0;
    2375        70686 :     int         ncompatible = 0;
    2376        70686 :     int         ncompatiblepreferred = 0;
    2377              :     Relation    rel;
    2378              :     ScanKeyData skey[1];
    2379              :     SysScanDesc scan;
    2380              :     HeapTuple   tup;
    2381              :     TYPCATEGORY tcategory;
    2382              : 
    2383              :     /* If it's a domain, look at the base type instead */
    2384        70686 :     type_id = getBaseType(type_id);
    2385              : 
    2386        70686 :     tcategory = TypeCategory(type_id);
    2387              : 
    2388              :     /*
    2389              :      * We scan through all the opclasses available for the access method,
    2390              :      * looking for one that is marked default and matches the target type
    2391              :      * (either exactly or binary-compatibly, but prefer an exact match).
    2392              :      *
    2393              :      * We could find more than one binary-compatible match.  If just one is
    2394              :      * for a preferred type, use that one; otherwise we fail, forcing the user
    2395              :      * to specify which one he wants.  (The preferred-type special case is a
    2396              :      * kluge for varchar: it's binary-compatible to both text and bpchar, so
    2397              :      * we need a tiebreaker.)  If we find more than one exact match, then
    2398              :      * someone put bogus entries in pg_opclass.
    2399              :      */
    2400        70686 :     rel = table_open(OperatorClassRelationId, AccessShareLock);
    2401              : 
    2402        70686 :     ScanKeyInit(&skey[0],
    2403              :                 Anum_pg_opclass_opcmethod,
    2404              :                 BTEqualStrategyNumber, F_OIDEQ,
    2405              :                 ObjectIdGetDatum(am_id));
    2406              : 
    2407        70686 :     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
    2408              :                               NULL, 1, skey);
    2409              : 
    2410      3120457 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    2411              :     {
    2412      3049771 :         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
    2413              : 
    2414              :         /* ignore altogether if not a default opclass */
    2415      3049771 :         if (!opclass->opcdefault)
    2416       436683 :             continue;
    2417      2613088 :         if (opclass->opcintype == type_id)
    2418              :         {
    2419        61313 :             nexact++;
    2420        61313 :             result = opclass->oid;
    2421              :         }
    2422      3827631 :         else if (nexact == 0 &&
    2423      1275856 :                  IsBinaryCoercible(type_id, opclass->opcintype))
    2424              :         {
    2425        16235 :             if (IsPreferredType(tcategory, opclass->opcintype))
    2426              :             {
    2427         1300 :                 ncompatiblepreferred++;
    2428         1300 :                 result = opclass->oid;
    2429              :             }
    2430        14935 :             else if (ncompatiblepreferred == 0)
    2431              :             {
    2432        14935 :                 ncompatible++;
    2433        14935 :                 result = opclass->oid;
    2434              :             }
    2435              :         }
    2436              :     }
    2437              : 
    2438        70686 :     systable_endscan(scan);
    2439              : 
    2440        70686 :     table_close(rel, AccessShareLock);
    2441              : 
    2442              :     /* raise error if pg_opclass contains inconsistent data */
    2443        70686 :     if (nexact > 1)
    2444            0 :         ereport(ERROR,
    2445              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2446              :                  errmsg("there are multiple default operator classes for data type %s",
    2447              :                         format_type_be(type_id))));
    2448              : 
    2449        70686 :     if (nexact == 1 ||
    2450         8074 :         ncompatiblepreferred == 1 ||
    2451         8074 :         (ncompatiblepreferred == 0 && ncompatible == 1))
    2452        69906 :         return result;
    2453              : 
    2454          780 :     return InvalidOid;
    2455              : }
    2456              : 
    2457              : /*
    2458              :  * GetOperatorFromCompareType
    2459              :  *
    2460              :  * opclass - the opclass to use
    2461              :  * rhstype - the type for the right-hand side, or InvalidOid to use the type of the given opclass.
    2462              :  * cmptype - kind of operator to find
    2463              :  * opid - holds the operator we found
    2464              :  * strat - holds the output strategy number
    2465              :  *
    2466              :  * Finds an operator from a CompareType.  This is used for temporal index
    2467              :  * constraints (and other temporal features) to look up equality and overlaps
    2468              :  * operators.  We ask an opclass support function to translate from the
    2469              :  * compare type to the internal strategy numbers.  Raises ERROR on search
    2470              :  * failure.
    2471              :  */
    2472              : void
    2473         2525 : GetOperatorFromCompareType(Oid opclass, Oid rhstype, CompareType cmptype,
    2474              :                            Oid *opid, StrategyNumber *strat)
    2475              : {
    2476              :     Oid         amid;
    2477              :     Oid         opfamily;
    2478              :     Oid         opcintype;
    2479              : 
    2480              :     Assert(cmptype == COMPARE_EQ || cmptype == COMPARE_OVERLAP || cmptype == COMPARE_CONTAINED_BY);
    2481              : 
    2482              :     /*
    2483              :      * Use the opclass to get the opfamily, opcintype, and access method. If
    2484              :      * any of this fails, quit early.
    2485              :      */
    2486         2525 :     if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
    2487            0 :         elog(ERROR, "cache lookup failed for opclass %u", opclass);
    2488              : 
    2489         2525 :     amid = get_opclass_method(opclass);
    2490              : 
    2491              :     /*
    2492              :      * Ask the index AM to translate to its internal stratnum
    2493              :      */
    2494         2525 :     *strat = IndexAmTranslateCompareType(cmptype, amid, opfamily, true);
    2495         2525 :     if (*strat == InvalidStrategy)
    2496            0 :         ereport(ERROR,
    2497              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2498              :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2499              :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2500              :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2501              :                 errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".",
    2502              :                           cmptype, get_opfamily_name(opfamily, false), get_am_name(amid)));
    2503              : 
    2504              :     /*
    2505              :      * We parameterize rhstype so foreign keys can ask for a <@ operator whose
    2506              :      * rhs matches the aggregate function. For example range_agg returns
    2507              :      * anymultirange.
    2508              :      */
    2509         2525 :     if (!OidIsValid(rhstype))
    2510         2266 :         rhstype = opcintype;
    2511         2525 :     *opid = get_opfamily_member(opfamily, opcintype, rhstype, *strat);
    2512              : 
    2513         2525 :     if (!OidIsValid(*opid))
    2514            0 :         ereport(ERROR,
    2515              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2516              :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2517              :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2518              :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2519              :                 errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    2520              :                           get_opfamily_name(opfamily, false), get_am_name(amid)));
    2521         2525 : }
    2522              : 
    2523              : /*
    2524              :  *  makeObjectName()
    2525              :  *
    2526              :  *  Create a name for an implicitly created index, sequence, constraint,
    2527              :  *  extended statistics, etc.
    2528              :  *
    2529              :  *  The parameters are typically: the original table name, the original field
    2530              :  *  name, and a "type" string (such as "seq" or "pkey").    The field name
    2531              :  *  and/or type can be NULL if not relevant.
    2532              :  *
    2533              :  *  The result is a palloc'd string.
    2534              :  *
    2535              :  *  The basic result we want is "name1_name2_label", omitting "_name2" or
    2536              :  *  "_label" when those parameters are NULL.  However, we must generate
    2537              :  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
    2538              :  *  both names if necessary to make a short-enough string.  The label part
    2539              :  *  is never truncated (so it had better be reasonably short).
    2540              :  *
    2541              :  *  The caller is responsible for checking uniqueness of the generated
    2542              :  *  name and retrying as needed; retrying will be done by altering the
    2543              :  *  "label" string (which is why we never truncate that part).
    2544              :  */
    2545              : char *
    2546        76885 : makeObjectName(const char *name1, const char *name2, const char *label)
    2547              : {
    2548              :     char       *name;
    2549        76885 :     int         overhead = 0;   /* chars needed for label and underscores */
    2550              :     int         availchars;     /* chars available for name(s) */
    2551              :     int         name1chars;     /* chars allocated to name1 */
    2552              :     int         name2chars;     /* chars allocated to name2 */
    2553              :     int         ndx;
    2554              : 
    2555        76885 :     name1chars = strlen(name1);
    2556        76885 :     if (name2)
    2557              :     {
    2558        67672 :         name2chars = strlen(name2);
    2559        67672 :         overhead++;             /* allow for separating underscore */
    2560              :     }
    2561              :     else
    2562         9213 :         name2chars = 0;
    2563        76885 :     if (label)
    2564        29457 :         overhead += strlen(label) + 1;
    2565              : 
    2566        76885 :     availchars = NAMEDATALEN - 1 - overhead;
    2567              :     Assert(availchars > 0);      /* else caller chose a bad label */
    2568              : 
    2569              :     /*
    2570              :      * If we must truncate, preferentially truncate the longer name. This
    2571              :      * logic could be expressed without a loop, but it's simple and obvious as
    2572              :      * a loop.
    2573              :      */
    2574        76929 :     while (name1chars + name2chars > availchars)
    2575              :     {
    2576           44 :         if (name1chars > name2chars)
    2577            0 :             name1chars--;
    2578              :         else
    2579           44 :             name2chars--;
    2580              :     }
    2581              : 
    2582        76885 :     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
    2583        76885 :     if (name2)
    2584        67672 :         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
    2585              : 
    2586              :     /* Now construct the string using the chosen lengths */
    2587        76885 :     name = palloc(name1chars + name2chars + overhead + 1);
    2588        76885 :     memcpy(name, name1, name1chars);
    2589        76885 :     ndx = name1chars;
    2590        76885 :     if (name2)
    2591              :     {
    2592        67672 :         name[ndx++] = '_';
    2593        67672 :         memcpy(name + ndx, name2, name2chars);
    2594        67672 :         ndx += name2chars;
    2595              :     }
    2596        76885 :     if (label)
    2597              :     {
    2598        29457 :         name[ndx++] = '_';
    2599        29457 :         strcpy(name + ndx, label);
    2600              :     }
    2601              :     else
    2602        47428 :         name[ndx] = '\0';
    2603              : 
    2604        76885 :     return name;
    2605              : }
    2606              : 
    2607              : /*
    2608              :  * Select a nonconflicting name for a new relation.  This is ordinarily
    2609              :  * used to choose index names (which is why it's here) but it can also
    2610              :  * be used for sequences, or any autogenerated relation kind.
    2611              :  *
    2612              :  * name1, name2, and label are used the same way as for makeObjectName(),
    2613              :  * except that the label can't be NULL; digits will be appended to the label
    2614              :  * if needed to create a name that is unique within the specified namespace.
    2615              :  *
    2616              :  * If isconstraint is true, we also avoid choosing a name matching any
    2617              :  * existing constraint in the same namespace.  (This is stricter than what
    2618              :  * Postgres itself requires, but the SQL standard says that constraint names
    2619              :  * should be unique within schemas, so we follow that for autogenerated
    2620              :  * constraint names.)
    2621              :  *
    2622              :  * Note: it is theoretically possible to get a collision anyway, if someone
    2623              :  * else chooses the same name concurrently.  We shorten the race condition
    2624              :  * window by checking for conflicting relations using SnapshotDirty, but
    2625              :  * that doesn't close the window entirely.  This is fairly unlikely to be
    2626              :  * a problem in practice, especially if one is holding an exclusive lock on
    2627              :  * the relation identified by name1.  However, if choosing multiple names
    2628              :  * within a single command, you'd better create the new object and do
    2629              :  * CommandCounterIncrement before choosing the next one!
    2630              :  *
    2631              :  * Returns a palloc'd string.
    2632              :  */
    2633              : char *
    2634         9486 : ChooseRelationName(const char *name1, const char *name2,
    2635              :                    const char *label, Oid namespaceid,
    2636              :                    bool isconstraint)
    2637              : {
    2638         9486 :     int         pass = 0;
    2639         9486 :     char       *relname = NULL;
    2640              :     char        modlabel[NAMEDATALEN];
    2641              :     SnapshotData SnapshotDirty;
    2642              :     Relation    pgclassrel;
    2643              : 
    2644              :     /* prepare to search pg_class with a dirty snapshot */
    2645         9486 :     InitDirtySnapshot(SnapshotDirty);
    2646         9486 :     pgclassrel = table_open(RelationRelationId, AccessShareLock);
    2647              : 
    2648              :     /* try the unmodified label first */
    2649         9486 :     strlcpy(modlabel, label, sizeof(modlabel));
    2650              : 
    2651              :     for (;;)
    2652         1275 :     {
    2653              :         ScanKeyData key[2];
    2654              :         SysScanDesc scan;
    2655              :         bool        collides;
    2656              : 
    2657        10761 :         relname = makeObjectName(name1, name2, modlabel);
    2658              : 
    2659              :         /* is there any conflicting relation name? */
    2660        10761 :         ScanKeyInit(&key[0],
    2661              :                     Anum_pg_class_relname,
    2662              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2663              :                     CStringGetDatum(relname));
    2664        10761 :         ScanKeyInit(&key[1],
    2665              :                     Anum_pg_class_relnamespace,
    2666              :                     BTEqualStrategyNumber, F_OIDEQ,
    2667              :                     ObjectIdGetDatum(namespaceid));
    2668              : 
    2669        10761 :         scan = systable_beginscan(pgclassrel, ClassNameNspIndexId,
    2670              :                                   true /* indexOK */ ,
    2671              :                                   &SnapshotDirty,
    2672              :                                   2, key);
    2673              : 
    2674        10761 :         collides = HeapTupleIsValid(systable_getnext(scan));
    2675              : 
    2676        10761 :         systable_endscan(scan);
    2677              : 
    2678              :         /* break out of loop if no conflict */
    2679        10761 :         if (!collides)
    2680              :         {
    2681         9490 :             if (!isconstraint ||
    2682         5884 :                 !ConstraintNameExists(relname, namespaceid))
    2683              :                 break;
    2684              :         }
    2685              : 
    2686              :         /* found a conflict, so try a new name component */
    2687         1275 :         pfree(relname);
    2688         1275 :         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
    2689              :     }
    2690              : 
    2691         9486 :     table_close(pgclassrel, AccessShareLock);
    2692              : 
    2693         9486 :     return relname;
    2694              : }
    2695              : 
    2696              : /*
    2697              :  * Select the name to be used for an index.
    2698              :  *
    2699              :  * The argument list is pretty ad-hoc :-(
    2700              :  */
    2701              : static char *
    2702         7990 : ChooseIndexName(const char *tabname, Oid namespaceId,
    2703              :                 const List *colnames, const List *exclusionOpNames,
    2704              :                 bool primary, bool isconstraint)
    2705              : {
    2706              :     char       *indexname;
    2707              : 
    2708         7990 :     if (primary)
    2709              :     {
    2710              :         /* the primary key's name does not depend on the specific column(s) */
    2711         5229 :         indexname = ChooseRelationName(tabname,
    2712              :                                        NULL,
    2713              :                                        "pkey",
    2714              :                                        namespaceId,
    2715              :                                        true);
    2716              :     }
    2717         2761 :     else if (exclusionOpNames != NIL)
    2718              :     {
    2719          155 :         indexname = ChooseRelationName(tabname,
    2720          155 :                                        ChooseIndexNameAddition(colnames),
    2721              :                                        "excl",
    2722              :                                        namespaceId,
    2723              :                                        true);
    2724              :     }
    2725         2606 :     else if (isconstraint)
    2726              :     {
    2727          496 :         indexname = ChooseRelationName(tabname,
    2728          496 :                                        ChooseIndexNameAddition(colnames),
    2729              :                                        "key",
    2730              :                                        namespaceId,
    2731              :                                        true);
    2732              :     }
    2733              :     else
    2734              :     {
    2735         2110 :         indexname = ChooseRelationName(tabname,
    2736         2110 :                                        ChooseIndexNameAddition(colnames),
    2737              :                                        "idx",
    2738              :                                        namespaceId,
    2739              :                                        false);
    2740              :     }
    2741              : 
    2742         7990 :     return indexname;
    2743              : }
    2744              : 
    2745              : /*
    2746              :  * Generate "name2" for a new index given the list of column names for it
    2747              :  * (as produced by ChooseIndexColumnNames).  This will be passed to
    2748              :  * ChooseRelationName along with the parent table name and a suitable label.
    2749              :  *
    2750              :  * We know that less than NAMEDATALEN characters will actually be used,
    2751              :  * so we can truncate the result once we've generated that many.
    2752              :  *
    2753              :  * XXX See also ChooseForeignKeyConstraintNameAddition and
    2754              :  * ChooseExtendedStatisticNameAddition.
    2755              :  */
    2756              : static char *
    2757         2761 : ChooseIndexNameAddition(const List *colnames)
    2758              : {
    2759              :     char        buf[NAMEDATALEN * 2];
    2760         2761 :     int         buflen = 0;
    2761              :     ListCell   *lc;
    2762              : 
    2763         2761 :     buf[0] = '\0';
    2764         6240 :     foreach(lc, colnames)
    2765              :     {
    2766         3479 :         const char *name = (const char *) lfirst(lc);
    2767              : 
    2768         3479 :         if (buflen > 0)
    2769          718 :             buf[buflen++] = '_';    /* insert _ between names */
    2770              : 
    2771              :         /*
    2772              :          * At this point we have buflen <= NAMEDATALEN.  name should be less
    2773              :          * than NAMEDATALEN already, but use strlcpy for paranoia.
    2774              :          */
    2775         3479 :         strlcpy(buf + buflen, name, NAMEDATALEN);
    2776         3479 :         buflen += strlen(buf + buflen);
    2777         3479 :         if (buflen >= NAMEDATALEN)
    2778            0 :             break;
    2779              :     }
    2780         2761 :     return pstrdup(buf);
    2781              : }
    2782              : 
    2783              : /*
    2784              :  * Select the actual names to be used for the columns of an index, given the
    2785              :  * list of IndexElems for the columns.  This is mostly about ensuring the
    2786              :  * names are unique so we don't get a conflicting-attribute-names error.
    2787              :  *
    2788              :  * Returns a List of plain strings (char *, not String nodes).
    2789              :  */
    2790              : static List *
    2791        19872 : ChooseIndexColumnNames(const List *indexElems)
    2792              : {
    2793        19872 :     List       *result = NIL;
    2794              :     ListCell   *lc;
    2795              : 
    2796        47724 :     foreach(lc, indexElems)
    2797              :     {
    2798        27852 :         IndexElem  *ielem = (IndexElem *) lfirst(lc);
    2799              :         const char *origname;
    2800              :         const char *curname;
    2801              :         int         i;
    2802              :         char        buf[NAMEDATALEN];
    2803              : 
    2804              :         /* Get the preliminary name from the IndexElem */
    2805        27852 :         if (ielem->indexcolname)
    2806         3030 :             origname = ielem->indexcolname; /* caller-specified name */
    2807        24822 :         else if (ielem->name)
    2808        24557 :             origname = ielem->name; /* simple column reference */
    2809              :         else
    2810          265 :             origname = "expr";    /* default name for expression */
    2811              : 
    2812              :         /* If it conflicts with any previous column, tweak it */
    2813        27852 :         curname = origname;
    2814        27852 :         for (i = 1;; i++)
    2815           37 :         {
    2816              :             ListCell   *lc2;
    2817              :             char        nbuf[32];
    2818              :             int         nlen;
    2819              : 
    2820        43354 :             foreach(lc2, result)
    2821              :             {
    2822        15502 :                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
    2823           37 :                     break;
    2824              :             }
    2825        27889 :             if (lc2 == NULL)
    2826        27852 :                 break;          /* found nonconflicting name */
    2827              : 
    2828           37 :             sprintf(nbuf, "%d", i);
    2829              : 
    2830              :             /* Ensure generated names are shorter than NAMEDATALEN */
    2831           37 :             nlen = pg_mbcliplen(origname, strlen(origname),
    2832           37 :                                 NAMEDATALEN - 1 - strlen(nbuf));
    2833           37 :             memcpy(buf, origname, nlen);
    2834           37 :             strcpy(buf + nlen, nbuf);
    2835           37 :             curname = buf;
    2836              :         }
    2837              : 
    2838              :         /* And attach to the result list */
    2839        27852 :         result = lappend(result, pstrdup(curname));
    2840              :     }
    2841        19872 :     return result;
    2842              : }
    2843              : 
    2844              : /*
    2845              :  * ExecReindex
    2846              :  *
    2847              :  * Primary entry point for manual REINDEX commands.  This is mainly a
    2848              :  * preparation wrapper for the real operations that will happen in
    2849              :  * each subroutine of REINDEX.
    2850              :  */
    2851              : void
    2852          694 : ExecReindex(ParseState *pstate, const ReindexStmt *stmt, bool isTopLevel)
    2853              : {
    2854          694 :     ReindexParams params = {0};
    2855              :     ListCell   *lc;
    2856          694 :     bool        concurrently = false;
    2857          694 :     bool        verbose = false;
    2858          694 :     char       *tablespacename = NULL;
    2859              : 
    2860              :     /* Parse option list */
    2861         1165 :     foreach(lc, stmt->params)
    2862              :     {
    2863          471 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2864              : 
    2865          471 :         if (strcmp(opt->defname, "verbose") == 0)
    2866            8 :             verbose = defGetBoolean(opt);
    2867          463 :         else if (strcmp(opt->defname, "concurrently") == 0)
    2868          382 :             concurrently = defGetBoolean(opt);
    2869           81 :         else if (strcmp(opt->defname, "tablespace") == 0)
    2870           81 :             tablespacename = defGetString(opt);
    2871              :         else
    2872            0 :             ereport(ERROR,
    2873              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2874              :                      errmsg("unrecognized %s option \"%s\"",
    2875              :                             "REINDEX", opt->defname),
    2876              :                      parser_errposition(pstate, opt->location)));
    2877              :     }
    2878              : 
    2879          694 :     if (concurrently)
    2880          382 :         PreventInTransactionBlock(isTopLevel,
    2881              :                                   "REINDEX CONCURRENTLY");
    2882              : 
    2883          677 :     params.options =
    2884         1354 :         (verbose ? REINDEXOPT_VERBOSE : 0) |
    2885          677 :         (concurrently ? REINDEXOPT_CONCURRENTLY : 0);
    2886              : 
    2887              :     /*
    2888              :      * Assign the tablespace OID to move indexes to, with InvalidOid to do
    2889              :      * nothing.
    2890              :      */
    2891          677 :     if (tablespacename != NULL)
    2892              :     {
    2893           81 :         params.tablespaceOid = get_tablespace_oid(tablespacename, false);
    2894              : 
    2895              :         /* Check permissions except when moving to database's default */
    2896           81 :         if (OidIsValid(params.tablespaceOid) &&
    2897           81 :             params.tablespaceOid != MyDatabaseTableSpace)
    2898              :         {
    2899              :             AclResult   aclresult;
    2900              : 
    2901           81 :             aclresult = object_aclcheck(TableSpaceRelationId, params.tablespaceOid,
    2902              :                                         GetUserId(), ACL_CREATE);
    2903           81 :             if (aclresult != ACLCHECK_OK)
    2904            6 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2905            6 :                                get_tablespace_name(params.tablespaceOid));
    2906              :         }
    2907              :     }
    2908              :     else
    2909          596 :         params.tablespaceOid = InvalidOid;
    2910              : 
    2911          671 :     switch (stmt->kind)
    2912              :     {
    2913          250 :         case REINDEX_OBJECT_INDEX:
    2914          250 :             ReindexIndex(stmt, &params, isTopLevel);
    2915          181 :             break;
    2916          310 :         case REINDEX_OBJECT_TABLE:
    2917          310 :             ReindexTable(stmt, &params, isTopLevel);
    2918          231 :             break;
    2919          111 :         case REINDEX_OBJECT_SCHEMA:
    2920              :         case REINDEX_OBJECT_SYSTEM:
    2921              :         case REINDEX_OBJECT_DATABASE:
    2922              : 
    2923              :             /*
    2924              :              * This cannot run inside a user transaction block; if we were
    2925              :              * inside a transaction, then its commit- and
    2926              :              * start-transaction-command calls would not have the intended
    2927              :              * effect!
    2928              :              */
    2929          111 :             PreventInTransactionBlock(isTopLevel,
    2930          148 :                                       (stmt->kind == REINDEX_OBJECT_SCHEMA) ? "REINDEX SCHEMA" :
    2931           37 :                                       (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
    2932              :                                       "REINDEX DATABASE");
    2933          107 :             ReindexMultipleTables(stmt, &params);
    2934           74 :             break;
    2935            0 :         default:
    2936            0 :             elog(ERROR, "unrecognized object type: %d",
    2937              :                  (int) stmt->kind);
    2938              :             break;
    2939              :     }
    2940          486 : }
    2941              : 
    2942              : /*
    2943              :  * ReindexIndex
    2944              :  *      Recreate a specific index.
    2945              :  */
    2946              : static void
    2947          250 : ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    2948              : {
    2949          250 :     const RangeVar *indexRelation = stmt->relation;
    2950              :     struct ReindexIndexCallbackState state;
    2951              :     Oid         indOid;
    2952              :     char        persistence;
    2953              :     char        relkind;
    2954              : 
    2955              :     /*
    2956              :      * Find and lock index, and check permissions on table; use callback to
    2957              :      * obtain lock on table first, to avoid deadlock hazard.  The lock level
    2958              :      * used here must match the index lock obtained in reindex_index().
    2959              :      *
    2960              :      * If it's a temporary index, we will perform a non-concurrent reindex,
    2961              :      * even if CONCURRENTLY was requested.  In that case, reindex_index() will
    2962              :      * upgrade the lock, but that's OK, because other sessions can't hold
    2963              :      * locks on our temporary table.
    2964              :      */
    2965          250 :     state.params = *params;
    2966          250 :     state.locked_table_oid = InvalidOid;
    2967          250 :     indOid = RangeVarGetRelidExtended(indexRelation,
    2968          250 :                                       (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2969              :                                       ShareUpdateExclusiveLock : AccessExclusiveLock,
    2970              :                                       0,
    2971              :                                       RangeVarCallbackForReindexIndex,
    2972              :                                       &state);
    2973              : 
    2974              :     /*
    2975              :      * Obtain the current persistence and kind of the existing index.  We
    2976              :      * already hold a lock on the index.
    2977              :      */
    2978          219 :     persistence = get_rel_persistence(indOid);
    2979          219 :     relkind = get_rel_relkind(indOid);
    2980              : 
    2981          219 :     if (relkind == RELKIND_PARTITIONED_INDEX)
    2982           24 :         ReindexPartitions(stmt, indOid, params, isTopLevel);
    2983          195 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    2984              :              persistence != RELPERSISTENCE_TEMP)
    2985          112 :         ReindexRelationConcurrently(stmt, indOid, params);
    2986              :     else
    2987              :     {
    2988           83 :         ReindexParams newparams = *params;
    2989              : 
    2990           83 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    2991           83 :         reindex_index(stmt, indOid, false, persistence, &newparams);
    2992              :     }
    2993          181 : }
    2994              : 
    2995              : /*
    2996              :  * Check permissions on table before acquiring relation lock; also lock
    2997              :  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
    2998              :  * deadlocks.
    2999              :  */
    3000              : static void
    3001          259 : RangeVarCallbackForReindexIndex(const RangeVar *relation,
    3002              :                                 Oid relId, Oid oldRelId, void *arg)
    3003              : {
    3004              :     char        relkind;
    3005          259 :     struct ReindexIndexCallbackState *state = arg;
    3006              :     LOCKMODE    table_lockmode;
    3007              :     Oid         table_oid;
    3008              :     AclResult   aclresult;
    3009              : 
    3010              :     /*
    3011              :      * Lock level here should match table lock in reindex_index() for
    3012              :      * non-concurrent case and table locks used by index_concurrently_*() for
    3013              :      * concurrent case.
    3014              :      */
    3015          518 :     table_lockmode = (state->params.options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3016          259 :         ShareUpdateExclusiveLock : ShareLock;
    3017              : 
    3018              :     /*
    3019              :      * If we previously locked some other index's heap, and the name we're
    3020              :      * looking up no longer refers to that relation, release the now-useless
    3021              :      * lock.
    3022              :      */
    3023          259 :     if (relId != oldRelId && OidIsValid(oldRelId))
    3024              :     {
    3025            3 :         UnlockRelationOid(state->locked_table_oid, table_lockmode);
    3026            3 :         state->locked_table_oid = InvalidOid;
    3027              :     }
    3028              : 
    3029              :     /* If the relation does not exist, there's nothing more to do. */
    3030          259 :     if (!OidIsValid(relId))
    3031            8 :         return;
    3032              : 
    3033              :     /* If the relation does exist, check whether it's an index. */
    3034          251 :     relkind = get_rel_relkind(relId);
    3035          251 :     if (relkind != RELKIND_INDEX &&
    3036              :         relkind != RELKIND_PARTITIONED_INDEX)
    3037           16 :         ereport(ERROR,
    3038              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3039              :                  errmsg("\"%s\" is not an index", relation->relname)));
    3040              : 
    3041              :     /* Look up the index's table. */
    3042          235 :     table_oid = IndexGetRelation(relId, false);
    3043              : 
    3044              :     /*
    3045              :      * In the unlikely event that, upon retry, we get the same index OID with
    3046              :      * a different table OID, fail.  RangeVarGetRelidExtended() will have
    3047              :      * already locked the index in this case, and it won't retry again, so we
    3048              :      * can't lock the newly discovered table OID without risking deadlock.
    3049              :      * Also, while this corner case is indeed possible, it is extremely
    3050              :      * unlikely to happen in practice, so it's probably not worth any more
    3051              :      * effort than this.
    3052              :      */
    3053          235 :     if (relId == oldRelId && table_oid != state->locked_table_oid)
    3054            0 :         ereport(ERROR,
    3055              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    3056              :                  errmsg("index \"%s\" was concurrently dropped",
    3057              :                         relation->relname)));
    3058              : 
    3059              :     /* Check permissions. */
    3060          235 :     aclresult = pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN);
    3061          235 :     if (aclresult != ACLCHECK_OK)
    3062            8 :         aclcheck_error(aclresult, OBJECT_INDEX, relation->relname);
    3063              : 
    3064              :     /* Lock heap before index to avoid deadlock. */
    3065          227 :     if (relId != oldRelId)
    3066              :     {
    3067          222 :         LockRelationOid(table_oid, table_lockmode);
    3068          222 :         state->locked_table_oid = table_oid;
    3069              :     }
    3070              : }
    3071              : 
    3072              : /*
    3073              :  * ReindexTable
    3074              :  *      Recreate all indexes of a table (and of its toast table, if any)
    3075              :  */
    3076              : static Oid
    3077          310 : ReindexTable(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    3078              : {
    3079              :     Oid         heapOid;
    3080              :     bool        result;
    3081          310 :     const RangeVar *relation = stmt->relation;
    3082              : 
    3083              :     /*
    3084              :      * The lock level used here should match reindex_relation().
    3085              :      *
    3086              :      * If it's a temporary table, we will perform a non-concurrent reindex,
    3087              :      * even if CONCURRENTLY was requested.  In that case, reindex_relation()
    3088              :      * will upgrade the lock, but that's OK, because other sessions can't hold
    3089              :      * locks on our temporary table.
    3090              :      */
    3091          310 :     heapOid = RangeVarGetRelidExtended(relation,
    3092          310 :                                        (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3093              :                                        ShareUpdateExclusiveLock : ShareLock,
    3094              :                                        0,
    3095              :                                        RangeVarCallbackMaintainsTable, NULL);
    3096              : 
    3097          281 :     if (get_rel_relkind(heapOid) == RELKIND_PARTITIONED_TABLE)
    3098           47 :         ReindexPartitions(stmt, heapOid, params, isTopLevel);
    3099          379 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3100          145 :              get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
    3101              :     {
    3102          137 :         result = ReindexRelationConcurrently(stmt, heapOid, params);
    3103              : 
    3104          112 :         if (!result)
    3105           12 :             ereport(NOTICE,
    3106              :                     (errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
    3107              :                             relation->relname)));
    3108              :     }
    3109              :     else
    3110              :     {
    3111           97 :         ReindexParams newparams = *params;
    3112              : 
    3113           97 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    3114           97 :         result = reindex_relation(stmt, heapOid,
    3115              :                                   REINDEX_REL_PROCESS_TOAST |
    3116              :                                   REINDEX_REL_CHECK_CONSTRAINTS,
    3117              :                                   &newparams);
    3118           76 :         if (!result)
    3119            8 :             ereport(NOTICE,
    3120              :                     (errmsg("table \"%s\" has no indexes to reindex",
    3121              :                             relation->relname)));
    3122              :     }
    3123              : 
    3124          231 :     return heapOid;
    3125              : }
    3126              : 
    3127              : /*
    3128              :  * ReindexMultipleTables
    3129              :  *      Recreate indexes of tables selected by objectName/objectKind.
    3130              :  *
    3131              :  * To reduce the probability of deadlocks, each table is reindexed in a
    3132              :  * separate transaction, so we can release the lock on it right away.
    3133              :  * That means this must not be called within a user transaction block!
    3134              :  */
    3135              : static void
    3136          107 : ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params)
    3137              : {
    3138              : 
    3139              :     Oid         objectOid;
    3140              :     Relation    relationRelation;
    3141              :     TableScanDesc scan;
    3142              :     ScanKeyData scan_keys[1];
    3143              :     HeapTuple   tuple;
    3144              :     MemoryContext private_context;
    3145              :     MemoryContext old;
    3146          107 :     List       *relids = NIL;
    3147              :     int         num_keys;
    3148          107 :     bool        concurrent_warning = false;
    3149          107 :     bool        tablespace_warning = false;
    3150          107 :     const char *objectName = stmt->name;
    3151          107 :     const ReindexObjectType objectKind = stmt->kind;
    3152              : 
    3153              :     Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
    3154              :            objectKind == REINDEX_OBJECT_SYSTEM ||
    3155              :            objectKind == REINDEX_OBJECT_DATABASE);
    3156              : 
    3157              :     /*
    3158              :      * This matches the options enforced by the grammar, where the object name
    3159              :      * is optional for DATABASE and SYSTEM.
    3160              :      */
    3161              :     Assert(objectName || objectKind != REINDEX_OBJECT_SCHEMA);
    3162              : 
    3163          107 :     if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3164           20 :         (params->options & REINDEXOPT_CONCURRENTLY) != 0)
    3165           13 :         ereport(ERROR,
    3166              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3167              :                  errmsg("cannot reindex system catalogs concurrently")));
    3168              : 
    3169              :     /*
    3170              :      * Get OID of object to reindex, being the database currently being used
    3171              :      * by session for a database or for system catalogs, or the schema defined
    3172              :      * by caller. At the same time do permission checks that need different
    3173              :      * processing depending on the object type.
    3174              :      */
    3175           94 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3176              :     {
    3177           70 :         objectOid = get_namespace_oid(objectName, false);
    3178              : 
    3179           66 :         if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId()) &&
    3180           16 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3181           12 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
    3182              :                            objectName);
    3183              :     }
    3184              :     else
    3185              :     {
    3186           24 :         objectOid = MyDatabaseId;
    3187              : 
    3188           24 :         if (objectName && strcmp(objectName, get_database_name(objectOid)) != 0)
    3189            4 :             ereport(ERROR,
    3190              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3191              :                      errmsg("can only reindex the currently open database")));
    3192           20 :         if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId()) &&
    3193            0 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3194            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    3195            0 :                            get_database_name(objectOid));
    3196              :     }
    3197              : 
    3198              :     /*
    3199              :      * Create a memory context that will survive forced transaction commits we
    3200              :      * do below.  Since it is a child of PortalContext, it will go away
    3201              :      * eventually even if we suffer an error; there's no need for special
    3202              :      * abort cleanup logic.
    3203              :      */
    3204           74 :     private_context = AllocSetContextCreate(PortalContext,
    3205              :                                             "ReindexMultipleTables",
    3206              :                                             ALLOCSET_SMALL_SIZES);
    3207              : 
    3208              :     /*
    3209              :      * Define the search keys to find the objects to reindex. For a schema, we
    3210              :      * select target relations using relnamespace, something not necessary for
    3211              :      * a database-wide operation.
    3212              :      */
    3213           74 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3214              :     {
    3215           54 :         num_keys = 1;
    3216           54 :         ScanKeyInit(&scan_keys[0],
    3217              :                     Anum_pg_class_relnamespace,
    3218              :                     BTEqualStrategyNumber, F_OIDEQ,
    3219              :                     ObjectIdGetDatum(objectOid));
    3220              :     }
    3221              :     else
    3222           20 :         num_keys = 0;
    3223              : 
    3224              :     /*
    3225              :      * Scan pg_class to build a list of the relations we need to reindex.
    3226              :      *
    3227              :      * We only consider plain relations and materialized views here (toast
    3228              :      * rels will be processed indirectly by reindex_relation).
    3229              :      */
    3230           74 :     relationRelation = table_open(RelationRelationId, AccessShareLock);
    3231           74 :     scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
    3232        10868 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3233              :     {
    3234        10794 :         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
    3235        10794 :         Oid         relid = classtuple->oid;
    3236              : 
    3237              :         /*
    3238              :          * Only regular tables and matviews can have indexes, so ignore any
    3239              :          * other kind of relation.
    3240              :          *
    3241              :          * Partitioned tables/indexes are skipped but matching leaf partitions
    3242              :          * are processed.
    3243              :          */
    3244        10794 :         if (classtuple->relkind != RELKIND_RELATION &&
    3245         8873 :             classtuple->relkind != RELKIND_MATVIEW)
    3246         8861 :             continue;
    3247              : 
    3248              :         /* Skip temp tables of other backends; we can't reindex them at all */
    3249         1933 :         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
    3250           24 :             !isTempNamespace(classtuple->relnamespace))
    3251            0 :             continue;
    3252              : 
    3253              :         /*
    3254              :          * Check user/system classification.  SYSTEM processes all the
    3255              :          * catalogs, and DATABASE processes everything that's not a catalog.
    3256              :          */
    3257         1933 :         if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3258          527 :             !IsCatalogRelationOid(relid))
    3259           44 :             continue;
    3260         2846 :         else if (objectKind == REINDEX_OBJECT_DATABASE &&
    3261          957 :                  IsCatalogRelationOid(relid))
    3262          897 :             continue;
    3263              : 
    3264              :         /*
    3265              :          * We already checked privileges on the database or schema, but we
    3266              :          * further restrict reindexing shared catalogs to roles with the
    3267              :          * MAINTAIN privilege on the relation.
    3268              :          */
    3269         1124 :         if (classtuple->relisshared &&
    3270          132 :             pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
    3271            0 :             continue;
    3272              : 
    3273              :         /*
    3274              :          * Skip system tables, since index_create() would reject indexing them
    3275              :          * concurrently (and it would likely fail if we tried).
    3276              :          */
    3277         1330 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3278          338 :             IsCatalogRelationOid(relid))
    3279              :         {
    3280          276 :             if (!concurrent_warning)
    3281            4 :                 ereport(WARNING,
    3282              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3283              :                          errmsg("cannot reindex system catalogs concurrently, skipping all")));
    3284          276 :             concurrent_warning = true;
    3285          276 :             continue;
    3286              :         }
    3287              : 
    3288              :         /*
    3289              :          * If a new tablespace is set, check if this relation has to be
    3290              :          * skipped.
    3291              :          */
    3292          716 :         if (OidIsValid(params->tablespaceOid))
    3293              :         {
    3294            0 :             bool        skip_rel = false;
    3295              : 
    3296              :             /*
    3297              :              * Mapped relations cannot be moved to different tablespaces (in
    3298              :              * particular this eliminates all shared catalogs.).
    3299              :              */
    3300            0 :             if (RELKIND_HAS_STORAGE(classtuple->relkind) &&
    3301            0 :                 !RelFileNumberIsValid(classtuple->relfilenode))
    3302            0 :                 skip_rel = true;
    3303              : 
    3304              :             /*
    3305              :              * A system relation is always skipped, even with
    3306              :              * allow_system_table_mods enabled.
    3307              :              */
    3308            0 :             if (IsSystemClass(relid, classtuple))
    3309            0 :                 skip_rel = true;
    3310              : 
    3311            0 :             if (skip_rel)
    3312              :             {
    3313            0 :                 if (!tablespace_warning)
    3314            0 :                     ereport(WARNING,
    3315              :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    3316              :                              errmsg("cannot move system relations, skipping all")));
    3317            0 :                 tablespace_warning = true;
    3318            0 :                 continue;
    3319              :             }
    3320              :         }
    3321              : 
    3322              :         /* Save the list of relation OIDs in private context */
    3323          716 :         old = MemoryContextSwitchTo(private_context);
    3324              : 
    3325              :         /*
    3326              :          * We always want to reindex pg_class first if it's selected to be
    3327              :          * reindexed.  This ensures that if there is any corruption in
    3328              :          * pg_class' indexes, they will be fixed before we process any other
    3329              :          * tables.  This is critical because reindexing itself will try to
    3330              :          * update pg_class.
    3331              :          */
    3332          716 :         if (relid == RelationRelationId)
    3333            8 :             relids = lcons_oid(relid, relids);
    3334              :         else
    3335          708 :             relids = lappend_oid(relids, relid);
    3336              : 
    3337          716 :         MemoryContextSwitchTo(old);
    3338              :     }
    3339           74 :     table_endscan(scan);
    3340           74 :     table_close(relationRelation, AccessShareLock);
    3341              : 
    3342              :     /*
    3343              :      * Process each relation listed in a separate transaction.  Note that this
    3344              :      * commits and then starts a new transaction immediately.
    3345              :      */
    3346           74 :     ReindexMultipleInternal(stmt, relids, params);
    3347              : 
    3348           74 :     MemoryContextDelete(private_context);
    3349           74 : }
    3350              : 
    3351              : /*
    3352              :  * Error callback specific to ReindexPartitions().
    3353              :  */
    3354              : static void
    3355            8 : reindex_error_callback(void *arg)
    3356              : {
    3357            8 :     ReindexErrorInfo *errinfo = (ReindexErrorInfo *) arg;
    3358              : 
    3359              :     Assert(RELKIND_HAS_PARTITIONS(errinfo->relkind));
    3360              : 
    3361            8 :     if (errinfo->relkind == RELKIND_PARTITIONED_TABLE)
    3362            4 :         errcontext("while reindexing partitioned table \"%s.%s\"",
    3363              :                    errinfo->relnamespace, errinfo->relname);
    3364            4 :     else if (errinfo->relkind == RELKIND_PARTITIONED_INDEX)
    3365            4 :         errcontext("while reindexing partitioned index \"%s.%s\"",
    3366              :                    errinfo->relnamespace, errinfo->relname);
    3367            8 : }
    3368              : 
    3369              : /*
    3370              :  * ReindexPartitions
    3371              :  *
    3372              :  * Reindex a set of partitions, per the partitioned index or table given
    3373              :  * by the caller.
    3374              :  */
    3375              : static void
    3376           71 : ReindexPartitions(const ReindexStmt *stmt, Oid relid, const ReindexParams *params, bool isTopLevel)
    3377              : {
    3378           71 :     List       *partitions = NIL;
    3379           71 :     char        relkind = get_rel_relkind(relid);
    3380           71 :     char       *relname = get_rel_name(relid);
    3381           71 :     char       *relnamespace = get_namespace_name(get_rel_namespace(relid));
    3382              :     MemoryContext reindex_context;
    3383              :     List       *inhoids;
    3384              :     ListCell   *lc;
    3385              :     ErrorContextCallback errcallback;
    3386              :     ReindexErrorInfo errinfo;
    3387              : 
    3388              :     Assert(RELKIND_HAS_PARTITIONS(relkind));
    3389              : 
    3390              :     /*
    3391              :      * Check if this runs in a transaction block, with an error callback to
    3392              :      * provide more context under which a problem happens.
    3393              :      */
    3394           71 :     errinfo.relname = pstrdup(relname);
    3395           71 :     errinfo.relnamespace = pstrdup(relnamespace);
    3396           71 :     errinfo.relkind = relkind;
    3397           71 :     errcallback.callback = reindex_error_callback;
    3398           71 :     errcallback.arg = &errinfo;
    3399           71 :     errcallback.previous = error_context_stack;
    3400           71 :     error_context_stack = &errcallback;
    3401              : 
    3402           71 :     PreventInTransactionBlock(isTopLevel,
    3403              :                               relkind == RELKIND_PARTITIONED_TABLE ?
    3404              :                               "REINDEX TABLE" : "REINDEX INDEX");
    3405              : 
    3406              :     /* Pop the error context stack */
    3407           63 :     error_context_stack = errcallback.previous;
    3408              : 
    3409              :     /*
    3410              :      * Create special memory context for cross-transaction storage.
    3411              :      *
    3412              :      * Since it is a child of PortalContext, it will go away eventually even
    3413              :      * if we suffer an error so there is no need for special abort cleanup
    3414              :      * logic.
    3415              :      */
    3416           63 :     reindex_context = AllocSetContextCreate(PortalContext, "Reindex",
    3417              :                                             ALLOCSET_DEFAULT_SIZES);
    3418              : 
    3419              :     /* ShareLock is enough to prevent schema modifications */
    3420           63 :     inhoids = find_all_inheritors(relid, ShareLock, NULL);
    3421              : 
    3422              :     /*
    3423              :      * The list of relations to reindex are the physical partitions of the
    3424              :      * tree so discard any partitioned table or index.
    3425              :      */
    3426          245 :     foreach(lc, inhoids)
    3427              :     {
    3428          182 :         Oid         partoid = lfirst_oid(lc);
    3429          182 :         char        partkind = get_rel_relkind(partoid);
    3430              :         MemoryContext old_context;
    3431              : 
    3432              :         /*
    3433              :          * This discards partitioned tables, partitioned indexes and foreign
    3434              :          * tables.
    3435              :          */
    3436          182 :         if (!RELKIND_HAS_STORAGE(partkind))
    3437          105 :             continue;
    3438              : 
    3439              :         Assert(partkind == RELKIND_INDEX ||
    3440              :                partkind == RELKIND_RELATION);
    3441              : 
    3442              :         /* Save partition OID */
    3443           77 :         old_context = MemoryContextSwitchTo(reindex_context);
    3444           77 :         partitions = lappend_oid(partitions, partoid);
    3445           77 :         MemoryContextSwitchTo(old_context);
    3446              :     }
    3447              : 
    3448              :     /*
    3449              :      * Process each partition listed in a separate transaction.  Note that
    3450              :      * this commits and then starts a new transaction immediately.
    3451              :      */
    3452           63 :     ReindexMultipleInternal(stmt, partitions, params);
    3453              : 
    3454              :     /*
    3455              :      * Clean up working storage --- note we must do this after
    3456              :      * StartTransactionCommand, else we might be trying to delete the active
    3457              :      * context!
    3458              :      */
    3459           63 :     MemoryContextDelete(reindex_context);
    3460           63 : }
    3461              : 
    3462              : /*
    3463              :  * ReindexMultipleInternal
    3464              :  *
    3465              :  * Reindex a list of relations, each one being processed in its own
    3466              :  * transaction.  This commits the existing transaction immediately,
    3467              :  * and starts a new transaction when finished.
    3468              :  */
    3469              : static void
    3470          137 : ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids, const ReindexParams *params)
    3471              : {
    3472              :     ListCell   *l;
    3473              : 
    3474          137 :     PopActiveSnapshot();
    3475          137 :     CommitTransactionCommand();
    3476              : 
    3477          930 :     foreach(l, relids)
    3478              :     {
    3479          793 :         Oid         relid = lfirst_oid(l);
    3480              :         char        relkind;
    3481              :         char        relpersistence;
    3482              : 
    3483          793 :         StartTransactionCommand();
    3484              : 
    3485              :         /* functions in indexes may want a snapshot set */
    3486          793 :         PushActiveSnapshot(GetTransactionSnapshot());
    3487              : 
    3488              :         /* check if the relation still exists */
    3489          793 :         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
    3490              :         {
    3491            2 :             PopActiveSnapshot();
    3492            2 :             CommitTransactionCommand();
    3493            2 :             continue;
    3494              :         }
    3495              : 
    3496              :         /*
    3497              :          * Check permissions except when moving to database's default if a new
    3498              :          * tablespace is chosen.  Note that this check also happens in
    3499              :          * ExecReindex(), but we do an extra check here as this runs across
    3500              :          * multiple transactions.
    3501              :          */
    3502          791 :         if (OidIsValid(params->tablespaceOid) &&
    3503            8 :             params->tablespaceOid != MyDatabaseTableSpace)
    3504              :         {
    3505              :             AclResult   aclresult;
    3506              : 
    3507            8 :             aclresult = object_aclcheck(TableSpaceRelationId, params->tablespaceOid,
    3508              :                                         GetUserId(), ACL_CREATE);
    3509            8 :             if (aclresult != ACLCHECK_OK)
    3510            0 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    3511            0 :                                get_tablespace_name(params->tablespaceOid));
    3512              :         }
    3513              : 
    3514          791 :         relkind = get_rel_relkind(relid);
    3515          791 :         relpersistence = get_rel_persistence(relid);
    3516              : 
    3517              :         /*
    3518              :          * Partitioned tables and indexes can never be processed directly, and
    3519              :          * a list of their leaves should be built first.
    3520              :          */
    3521              :         Assert(!RELKIND_HAS_PARTITIONS(relkind));
    3522              : 
    3523          791 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3524              :             relpersistence != RELPERSISTENCE_TEMP)
    3525           81 :         {
    3526           81 :             ReindexParams newparams = *params;
    3527              : 
    3528           81 :             newparams.options |= REINDEXOPT_MISSING_OK;
    3529           81 :             (void) ReindexRelationConcurrently(stmt, relid, &newparams);
    3530           81 :             if (ActiveSnapshotSet())
    3531           17 :                 PopActiveSnapshot();
    3532              :             /* ReindexRelationConcurrently() does the verbose output */
    3533              :         }
    3534          710 :         else if (relkind == RELKIND_INDEX)
    3535              :         {
    3536           12 :             ReindexParams newparams = *params;
    3537              : 
    3538           12 :             newparams.options |=
    3539              :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3540           12 :             reindex_index(stmt, relid, false, relpersistence, &newparams);
    3541           12 :             PopActiveSnapshot();
    3542              :             /* reindex_index() does the verbose output */
    3543              :         }
    3544              :         else
    3545              :         {
    3546              :             bool        result;
    3547          698 :             ReindexParams newparams = *params;
    3548              : 
    3549          698 :             newparams.options |=
    3550              :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3551          698 :             result = reindex_relation(stmt, relid,
    3552              :                                       REINDEX_REL_PROCESS_TOAST |
    3553              :                                       REINDEX_REL_CHECK_CONSTRAINTS,
    3554              :                                       &newparams);
    3555              : 
    3556          698 :             if (result && (params->options & REINDEXOPT_VERBOSE) != 0)
    3557            0 :                 ereport(INFO,
    3558              :                         (errmsg("table \"%s.%s\" was reindexed",
    3559              :                                 get_namespace_name(get_rel_namespace(relid)),
    3560              :                                 get_rel_name(relid))));
    3561              : 
    3562          698 :             PopActiveSnapshot();
    3563              :         }
    3564              : 
    3565          791 :         CommitTransactionCommand();
    3566              :     }
    3567              : 
    3568          137 :     StartTransactionCommand();
    3569          137 : }
    3570              : 
    3571              : 
    3572              : /*
    3573              :  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
    3574              :  * relation OID
    3575              :  *
    3576              :  * 'relationOid' can either belong to an index, a table or a materialized
    3577              :  * view.  For tables and materialized views, all its indexes will be rebuilt,
    3578              :  * excluding invalid indexes and any indexes used in exclusion constraints,
    3579              :  * but including its associated toast table indexes.  For indexes, the index
    3580              :  * itself will be rebuilt.
    3581              :  *
    3582              :  * The locks taken on parent tables and involved indexes are kept until the
    3583              :  * transaction is committed, at which point a session lock is taken on each
    3584              :  * relation.  Both of these protect against concurrent schema changes.
    3585              :  *
    3586              :  * Returns true if any indexes have been rebuilt (including toast table's
    3587              :  * indexes, when relevant), otherwise returns false.
    3588              :  *
    3589              :  * NOTE: This cannot be used on temporary relations.  A concurrent build would
    3590              :  * cause issues with ON COMMIT actions triggered by the transactions of the
    3591              :  * concurrent build.  Temporary relations are not subject to concurrent
    3592              :  * concerns, so there's no need for the more complicated concurrent build,
    3593              :  * anyway, and a non-concurrent reindex is more efficient.
    3594              :  */
    3595              : static bool
    3596          330 : ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const ReindexParams *params)
    3597              : {
    3598              :     typedef struct ReindexIndexInfo
    3599              :     {
    3600              :         Oid         indexId;
    3601              :         Oid         tableId;
    3602              :         Oid         amId;
    3603              :         bool        safe;       /* for set_indexsafe_procflags */
    3604              :     } ReindexIndexInfo;
    3605          330 :     List       *heapRelationIds = NIL;
    3606          330 :     List       *indexIds = NIL;
    3607          330 :     List       *newIndexIds = NIL;
    3608          330 :     List       *relationLocks = NIL;
    3609          330 :     List       *lockTags = NIL;
    3610              :     ListCell   *lc,
    3611              :                *lc2;
    3612              :     MemoryContext private_context;
    3613              :     MemoryContext oldcontext;
    3614              :     char        relkind;
    3615          330 :     char       *relationName = NULL;
    3616          330 :     char       *relationNamespace = NULL;
    3617              :     PGRUsage    ru0;
    3618          330 :     const int   progress_index[] = {
    3619              :         PROGRESS_CREATEIDX_COMMAND,
    3620              :         PROGRESS_CREATEIDX_PHASE,
    3621              :         PROGRESS_CREATEIDX_INDEX_OID,
    3622              :         PROGRESS_CREATEIDX_ACCESS_METHOD_OID
    3623              :     };
    3624              :     int64       progress_vals[4];
    3625              : 
    3626              :     /*
    3627              :      * Create a memory context that will survive forced transaction commits we
    3628              :      * do below.  Since it is a child of PortalContext, it will go away
    3629              :      * eventually even if we suffer an error; there's no need for special
    3630              :      * abort cleanup logic.
    3631              :      */
    3632          330 :     private_context = AllocSetContextCreate(PortalContext,
    3633              :                                             "ReindexConcurrent",
    3634              :                                             ALLOCSET_SMALL_SIZES);
    3635              : 
    3636          330 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    3637              :     {
    3638              :         /* Save data needed by REINDEX VERBOSE in private context */
    3639            2 :         oldcontext = MemoryContextSwitchTo(private_context);
    3640              : 
    3641            2 :         relationName = get_rel_name(relationOid);
    3642            2 :         relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
    3643              : 
    3644            2 :         pg_rusage_init(&ru0);
    3645              : 
    3646            2 :         MemoryContextSwitchTo(oldcontext);
    3647              :     }
    3648              : 
    3649          330 :     relkind = get_rel_relkind(relationOid);
    3650              : 
    3651              :     /*
    3652              :      * Extract the list of indexes that are going to be rebuilt based on the
    3653              :      * relation Oid given by caller.
    3654              :      */
    3655          330 :     switch (relkind)
    3656              :     {
    3657          202 :         case RELKIND_RELATION:
    3658              :         case RELKIND_MATVIEW:
    3659              :         case RELKIND_TOASTVALUE:
    3660              :             {
    3661              :                 /*
    3662              :                  * In the case of a relation, find all its indexes including
    3663              :                  * toast indexes.
    3664              :                  */
    3665              :                 Relation    heapRelation;
    3666              : 
    3667              :                 /* Save the list of relation OIDs in private context */
    3668          202 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3669              : 
    3670              :                 /* Track this relation for session locks */
    3671          202 :                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
    3672              : 
    3673          202 :                 MemoryContextSwitchTo(oldcontext);
    3674              : 
    3675          202 :                 if (IsCatalogRelationOid(relationOid))
    3676           24 :                     ereport(ERROR,
    3677              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3678              :                              errmsg("cannot reindex system catalogs concurrently")));
    3679              : 
    3680              :                 /* Open relation to get its indexes */
    3681          178 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3682              :                 {
    3683           65 :                     heapRelation = try_table_open(relationOid,
    3684              :                                                   ShareUpdateExclusiveLock);
    3685              :                     /* leave if relation does not exist */
    3686           65 :                     if (!heapRelation)
    3687            0 :                         break;
    3688              :                 }
    3689              :                 else
    3690          113 :                     heapRelation = table_open(relationOid,
    3691              :                                               ShareUpdateExclusiveLock);
    3692              : 
    3693          192 :                 if (OidIsValid(params->tablespaceOid) &&
    3694           14 :                     IsSystemRelation(heapRelation))
    3695            1 :                     ereport(ERROR,
    3696              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3697              :                              errmsg("cannot move system relation \"%s\"",
    3698              :                                     RelationGetRelationName(heapRelation))));
    3699              : 
    3700              :                 /* Add all the valid indexes of relation to list */
    3701          344 :                 foreach(lc, RelationGetIndexList(heapRelation))
    3702              :                 {
    3703          167 :                     Oid         cellOid = lfirst_oid(lc);
    3704          167 :                     Relation    indexRelation = index_open(cellOid,
    3705              :                                                            ShareUpdateExclusiveLock);
    3706              : 
    3707          167 :                     if (!indexRelation->rd_index->indisvalid)
    3708            4 :                         ereport(WARNING,
    3709              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3710              :                                  errmsg("skipping reindex of invalid index \"%s.%s\"",
    3711              :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3712              :                                         get_rel_name(cellOid)),
    3713              :                                  errhint("Use DROP INDEX or REINDEX INDEX.")));
    3714          163 :                     else if (indexRelation->rd_index->indisexclusion)
    3715            4 :                         ereport(WARNING,
    3716              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3717              :                                  errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
    3718              :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3719              :                                         get_rel_name(cellOid))));
    3720              :                     else
    3721              :                     {
    3722              :                         ReindexIndexInfo *idx;
    3723              : 
    3724              :                         /* Save the list of relation OIDs in private context */
    3725          159 :                         oldcontext = MemoryContextSwitchTo(private_context);
    3726              : 
    3727          159 :                         idx = palloc_object(ReindexIndexInfo);
    3728          159 :                         idx->indexId = cellOid;
    3729              :                         /* other fields set later */
    3730              : 
    3731          159 :                         indexIds = lappend(indexIds, idx);
    3732              : 
    3733          159 :                         MemoryContextSwitchTo(oldcontext);
    3734              :                     }
    3735              : 
    3736          167 :                     index_close(indexRelation, NoLock);
    3737              :                 }
    3738              : 
    3739              :                 /* Also add the toast indexes */
    3740          177 :                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
    3741              :                 {
    3742           51 :                     Oid         toastOid = heapRelation->rd_rel->reltoastrelid;
    3743           51 :                     Relation    toastRelation = table_open(toastOid,
    3744              :                                                            ShareUpdateExclusiveLock);
    3745              : 
    3746              :                     /* Save the list of relation OIDs in private context */
    3747           51 :                     oldcontext = MemoryContextSwitchTo(private_context);
    3748              : 
    3749              :                     /* Track this relation for session locks */
    3750           51 :                     heapRelationIds = lappend_oid(heapRelationIds, toastOid);
    3751              : 
    3752           51 :                     MemoryContextSwitchTo(oldcontext);
    3753              : 
    3754          102 :                     foreach(lc2, RelationGetIndexList(toastRelation))
    3755              :                     {
    3756           51 :                         Oid         cellOid = lfirst_oid(lc2);
    3757           51 :                         Relation    indexRelation = index_open(cellOid,
    3758              :                                                                ShareUpdateExclusiveLock);
    3759              : 
    3760           51 :                         if (!indexRelation->rd_index->indisvalid)
    3761            0 :                             ereport(WARNING,
    3762              :                                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3763              :                                      errmsg("skipping reindex of invalid index \"%s.%s\"",
    3764              :                                             get_namespace_name(get_rel_namespace(cellOid)),
    3765              :                                             get_rel_name(cellOid)),
    3766              :                                      errhint("Use DROP INDEX or REINDEX INDEX.")));
    3767              :                         else
    3768              :                         {
    3769              :                             ReindexIndexInfo *idx;
    3770              : 
    3771              :                             /*
    3772              :                              * Save the list of relation OIDs in private
    3773              :                              * context
    3774              :                              */
    3775           51 :                             oldcontext = MemoryContextSwitchTo(private_context);
    3776              : 
    3777           51 :                             idx = palloc_object(ReindexIndexInfo);
    3778           51 :                             idx->indexId = cellOid;
    3779           51 :                             indexIds = lappend(indexIds, idx);
    3780              :                             /* other fields set later */
    3781              : 
    3782           51 :                             MemoryContextSwitchTo(oldcontext);
    3783              :                         }
    3784              : 
    3785           51 :                         index_close(indexRelation, NoLock);
    3786              :                     }
    3787              : 
    3788           51 :                     table_close(toastRelation, NoLock);
    3789              :                 }
    3790              : 
    3791          177 :                 table_close(heapRelation, NoLock);
    3792          177 :                 break;
    3793              :             }
    3794          128 :         case RELKIND_INDEX:
    3795              :             {
    3796          128 :                 Oid         heapId = IndexGetRelation(relationOid,
    3797          128 :                                                       (params->options & REINDEXOPT_MISSING_OK) != 0);
    3798              :                 Relation    heapRelation;
    3799              :                 ReindexIndexInfo *idx;
    3800              : 
    3801              :                 /* if relation is missing, leave */
    3802          128 :                 if (!OidIsValid(heapId))
    3803            0 :                     break;
    3804              : 
    3805          128 :                 if (IsCatalogRelationOid(heapId))
    3806           12 :                     ereport(ERROR,
    3807              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3808              :                              errmsg("cannot reindex system catalogs concurrently")));
    3809              : 
    3810              :                 /*
    3811              :                  * Don't allow reindex for an invalid index on TOAST table, as
    3812              :                  * if rebuilt it would not be possible to drop it.  Match
    3813              :                  * error message in reindex_index().
    3814              :                  */
    3815          116 :                 if (IsToastNamespace(get_rel_namespace(relationOid)) &&
    3816           28 :                     !get_index_isvalid(relationOid))
    3817            0 :                     ereport(ERROR,
    3818              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3819              :                              errmsg("cannot reindex invalid index on TOAST table")));
    3820              : 
    3821              :                 /*
    3822              :                  * Check if parent relation can be locked and if it exists,
    3823              :                  * this needs to be done at this stage as the list of indexes
    3824              :                  * to rebuild is not complete yet, and REINDEXOPT_MISSING_OK
    3825              :                  * should not be used once all the session locks are taken.
    3826              :                  */
    3827          116 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3828              :                 {
    3829           16 :                     heapRelation = try_table_open(heapId,
    3830              :                                                   ShareUpdateExclusiveLock);
    3831              :                     /* leave if relation does not exist */
    3832           16 :                     if (!heapRelation)
    3833            0 :                         break;
    3834              :                 }
    3835              :                 else
    3836          100 :                     heapRelation = table_open(heapId,
    3837              :                                               ShareUpdateExclusiveLock);
    3838              : 
    3839          121 :                 if (OidIsValid(params->tablespaceOid) &&
    3840            5 :                     IsSystemRelation(heapRelation))
    3841            1 :                     ereport(ERROR,
    3842              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3843              :                              errmsg("cannot move system relation \"%s\"",
    3844              :                                     get_rel_name(relationOid))));
    3845              : 
    3846          115 :                 table_close(heapRelation, NoLock);
    3847              : 
    3848              :                 /* Save the list of relation OIDs in private context */
    3849          115 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3850              : 
    3851              :                 /* Track the heap relation of this index for session locks */
    3852          115 :                 heapRelationIds = list_make1_oid(heapId);
    3853              : 
    3854              :                 /*
    3855              :                  * Save the list of relation OIDs in private context.  Note
    3856              :                  * that invalid indexes are allowed here.
    3857              :                  */
    3858          115 :                 idx = palloc_object(ReindexIndexInfo);
    3859          115 :                 idx->indexId = relationOid;
    3860          115 :                 indexIds = lappend(indexIds, idx);
    3861              :                 /* other fields set later */
    3862              : 
    3863          115 :                 MemoryContextSwitchTo(oldcontext);
    3864          115 :                 break;
    3865              :             }
    3866              : 
    3867            0 :         case RELKIND_PARTITIONED_TABLE:
    3868              :         case RELKIND_PARTITIONED_INDEX:
    3869              :         default:
    3870              :             /* Return error if type of relation is not supported */
    3871            0 :             ereport(ERROR,
    3872              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3873              :                      errmsg("cannot reindex this type of relation concurrently")));
    3874              :             break;
    3875              :     }
    3876              : 
    3877              :     /*
    3878              :      * Definitely no indexes, so leave.  Any checks based on
    3879              :      * REINDEXOPT_MISSING_OK should be done only while the list of indexes to
    3880              :      * work on is built as the session locks taken before this transaction
    3881              :      * commits will make sure that they cannot be dropped by a concurrent
    3882              :      * session until this operation completes.
    3883              :      */
    3884          292 :     if (indexIds == NIL)
    3885           29 :         return false;
    3886              : 
    3887              :     /* It's not a shared catalog, so refuse to move it to shared tablespace */
    3888          263 :     if (params->tablespaceOid == GLOBALTABLESPACE_OID)
    3889            4 :         ereport(ERROR,
    3890              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3891              :                  errmsg("cannot move non-shared relation to tablespace \"%s\"",
    3892              :                         get_tablespace_name(params->tablespaceOid))));
    3893              : 
    3894              :     Assert(heapRelationIds != NIL);
    3895              : 
    3896              :     /*-----
    3897              :      * Now we have all the indexes we want to process in indexIds.
    3898              :      *
    3899              :      * The phases now are:
    3900              :      *
    3901              :      * 1. create new indexes in the catalog
    3902              :      * 2. build new indexes
    3903              :      * 3. let new indexes catch up with tuples inserted in the meantime
    3904              :      * 4. swap index names
    3905              :      * 5. mark old indexes as dead
    3906              :      * 6. drop old indexes
    3907              :      *
    3908              :      * We process each phase for all indexes before moving to the next phase,
    3909              :      * for efficiency.
    3910              :      */
    3911              : 
    3912              :     /*
    3913              :      * Phase 1 of REINDEX CONCURRENTLY
    3914              :      *
    3915              :      * Create a new index with the same properties as the old one, but it is
    3916              :      * only registered in catalogs and will be built later.  Then get session
    3917              :      * locks on all involved tables.  See analogous code in DefineIndex() for
    3918              :      * more detailed comments.
    3919              :      */
    3920              : 
    3921          576 :     foreach(lc, indexIds)
    3922              :     {
    3923              :         char       *concurrentName;
    3924          321 :         ReindexIndexInfo *idx = lfirst(lc);
    3925              :         ReindexIndexInfo *newidx;
    3926              :         Oid         newIndexId;
    3927              :         Relation    indexRel;
    3928              :         Relation    heapRel;
    3929              :         Oid         save_userid;
    3930              :         int         save_sec_context;
    3931              :         int         save_nestlevel;
    3932              :         Relation    newIndexRel;
    3933              :         LockRelId  *lockrelid;
    3934              :         Oid         tablespaceid;
    3935              : 
    3936          321 :         indexRel = index_open(idx->indexId, ShareUpdateExclusiveLock);
    3937          321 :         heapRel = table_open(indexRel->rd_index->indrelid,
    3938              :                              ShareUpdateExclusiveLock);
    3939              : 
    3940              :         /*
    3941              :          * Switch to the table owner's userid, so that any index functions are
    3942              :          * run as that user.  Also lock down security-restricted operations
    3943              :          * and arrange to make GUC variable changes local to this command.
    3944              :          */
    3945          321 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    3946          321 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
    3947              :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
    3948          321 :         save_nestlevel = NewGUCNestLevel();
    3949          321 :         RestrictSearchPath();
    3950              : 
    3951              :         /* determine safety of this index for set_indexsafe_procflags */
    3952          616 :         idx->safe = (RelationGetIndexExpressions(indexRel) == NIL &&
    3953          295 :                      RelationGetIndexPredicate(indexRel) == NIL);
    3954              : 
    3955              : #ifdef USE_INJECTION_POINTS
    3956          321 :         if (idx->safe)
    3957          290 :             INJECTION_POINT("reindex-conc-index-safe", NULL);
    3958              :         else
    3959           31 :             INJECTION_POINT("reindex-conc-index-not-safe", NULL);
    3960              : #endif
    3961              : 
    3962          321 :         idx->tableId = RelationGetRelid(heapRel);
    3963          321 :         idx->amId = indexRel->rd_rel->relam;
    3964              : 
    3965              :         /* This function shouldn't be called for temporary relations. */
    3966          321 :         if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
    3967            0 :             elog(ERROR, "cannot reindex a temporary table concurrently");
    3968              : 
    3969          321 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, idx->tableId);
    3970              : 
    3971          321 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3972          321 :         progress_vals[1] = 0;   /* initializing */
    3973          321 :         progress_vals[2] = idx->indexId;
    3974          321 :         progress_vals[3] = idx->amId;
    3975          321 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    3976              : 
    3977              :         /* Choose a temporary relation name for the new index */
    3978          321 :         concurrentName = ChooseRelationName(get_rel_name(idx->indexId),
    3979              :                                             NULL,
    3980              :                                             "ccnew",
    3981          321 :                                             get_rel_namespace(indexRel->rd_index->indrelid),
    3982              :                                             false);
    3983              : 
    3984              :         /* Choose the new tablespace, indexes of toast tables are not moved */
    3985          321 :         if (OidIsValid(params->tablespaceOid) &&
    3986           18 :             heapRel->rd_rel->relkind != RELKIND_TOASTVALUE)
    3987           13 :             tablespaceid = params->tablespaceOid;
    3988              :         else
    3989          308 :             tablespaceid = indexRel->rd_rel->reltablespace;
    3990              : 
    3991              :         /* Create new index definition based on given index */
    3992          321 :         newIndexId = index_create_copy(heapRel,
    3993              :                                        INDEX_CREATE_CONCURRENT |
    3994              :                                        INDEX_CREATE_SKIP_BUILD |
    3995              :                                        INDEX_CREATE_SUPPRESS_PROGRESS,
    3996              :                                        idx->indexId,
    3997              :                                        tablespaceid,
    3998              :                                        concurrentName);
    3999              : 
    4000              :         /*
    4001              :          * Now open the relation of the new index, a session-level lock is
    4002              :          * also needed on it.
    4003              :          */
    4004          317 :         newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
    4005              : 
    4006              :         /*
    4007              :          * Save the list of OIDs and locks in private context
    4008              :          */
    4009          317 :         oldcontext = MemoryContextSwitchTo(private_context);
    4010              : 
    4011          317 :         newidx = palloc_object(ReindexIndexInfo);
    4012          317 :         newidx->indexId = newIndexId;
    4013          317 :         newidx->safe = idx->safe;
    4014          317 :         newidx->tableId = idx->tableId;
    4015          317 :         newidx->amId = idx->amId;
    4016              : 
    4017          317 :         newIndexIds = lappend(newIndexIds, newidx);
    4018              : 
    4019              :         /*
    4020              :          * Save lockrelid to protect each relation from drop then close
    4021              :          * relations. The lockrelid on parent relation is not taken here to
    4022              :          * avoid multiple locks taken on the same relation, instead we rely on
    4023              :          * parentRelationIds built earlier.
    4024              :          */
    4025          317 :         lockrelid = palloc_object(LockRelId);
    4026          317 :         *lockrelid = indexRel->rd_lockInfo.lockRelId;
    4027          317 :         relationLocks = lappend(relationLocks, lockrelid);
    4028          317 :         lockrelid = palloc_object(LockRelId);
    4029          317 :         *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
    4030          317 :         relationLocks = lappend(relationLocks, lockrelid);
    4031              : 
    4032          317 :         MemoryContextSwitchTo(oldcontext);
    4033              : 
    4034          317 :         index_close(indexRel, NoLock);
    4035          317 :         index_close(newIndexRel, NoLock);
    4036              : 
    4037              :         /* Roll back any GUC changes executed by index functions */
    4038          317 :         AtEOXact_GUC(false, save_nestlevel);
    4039              : 
    4040              :         /* Restore userid and security context */
    4041          317 :         SetUserIdAndSecContext(save_userid, save_sec_context);
    4042              : 
    4043          317 :         table_close(heapRel, NoLock);
    4044              : 
    4045              :         /*
    4046              :          * If a statement is available, telling that this comes from a REINDEX
    4047              :          * command, collect the new index for event triggers.
    4048              :          */
    4049          317 :         if (stmt)
    4050              :         {
    4051              :             ObjectAddress address;
    4052              : 
    4053          317 :             ObjectAddressSet(address, RelationRelationId, newIndexId);
    4054          317 :             EventTriggerCollectSimpleCommand(address,
    4055              :                                              InvalidObjectAddress,
    4056              :                                              (const Node *) stmt);
    4057              :         }
    4058              :     }
    4059              : 
    4060              :     /*
    4061              :      * Save the heap lock for following visibility checks with other backends
    4062              :      * might conflict with this session.
    4063              :      */
    4064          561 :     foreach(lc, heapRelationIds)
    4065              :     {
    4066          306 :         Relation    heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
    4067              :         LockRelId  *lockrelid;
    4068              :         LOCKTAG    *heaplocktag;
    4069              : 
    4070              :         /* Save the list of locks in private context */
    4071          306 :         oldcontext = MemoryContextSwitchTo(private_context);
    4072              : 
    4073              :         /* Add lockrelid of heap relation to the list of locked relations */
    4074          306 :         lockrelid = palloc_object(LockRelId);
    4075          306 :         *lockrelid = heapRelation->rd_lockInfo.lockRelId;
    4076          306 :         relationLocks = lappend(relationLocks, lockrelid);
    4077              : 
    4078          306 :         heaplocktag = palloc_object(LOCKTAG);
    4079              : 
    4080              :         /* Save the LOCKTAG for this parent relation for the wait phase */
    4081          306 :         SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
    4082          306 :         lockTags = lappend(lockTags, heaplocktag);
    4083              : 
    4084          306 :         MemoryContextSwitchTo(oldcontext);
    4085              : 
    4086              :         /* Close heap relation */
    4087          306 :         table_close(heapRelation, NoLock);
    4088              :     }
    4089              : 
    4090              :     /* Get a session-level lock on each table. */
    4091         1195 :     foreach(lc, relationLocks)
    4092              :     {
    4093          940 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4094              : 
    4095          940 :         LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4096              :     }
    4097              : 
    4098          255 :     PopActiveSnapshot();
    4099          255 :     CommitTransactionCommand();
    4100          255 :     StartTransactionCommand();
    4101              : 
    4102              :     /*
    4103              :      * Because we don't take a snapshot in this transaction, there's no need
    4104              :      * to set the PROC_IN_SAFE_IC flag here.
    4105              :      */
    4106              : 
    4107              :     /*
    4108              :      * Phase 2 of REINDEX CONCURRENTLY
    4109              :      *
    4110              :      * Build the new indexes in a separate transaction for each index to avoid
    4111              :      * having open transactions for an unnecessary long time.  But before
    4112              :      * doing that, wait until no running transactions could have the table of
    4113              :      * the index open with the old list of indexes.  See "phase 2" in
    4114              :      * DefineIndex() for more details.
    4115              :      */
    4116              : 
    4117          255 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4118              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    4119          255 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4120          255 :     CommitTransactionCommand();
    4121              : 
    4122          568 :     foreach(lc, newIndexIds)
    4123              :     {
    4124          317 :         ReindexIndexInfo *newidx = lfirst(lc);
    4125              : 
    4126              :         /* Start new transaction for this index's concurrent build */
    4127          317 :         StartTransactionCommand();
    4128              : 
    4129              :         /*
    4130              :          * Check for user-requested abort.  This is inside a transaction so as
    4131              :          * xact.c does not issue a useless WARNING, and ensures that
    4132              :          * session-level locks are cleaned up on abort.
    4133              :          */
    4134          317 :         CHECK_FOR_INTERRUPTS();
    4135              : 
    4136              :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4137          317 :         if (newidx->safe)
    4138          286 :             set_indexsafe_procflags();
    4139              : 
    4140              :         /* Set ActiveSnapshot since functions in the indexes may need it */
    4141          317 :         PushActiveSnapshot(GetTransactionSnapshot());
    4142              : 
    4143              :         /*
    4144              :          * Update progress for the index to build, with the correct parent
    4145              :          * table involved.
    4146              :          */
    4147          317 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4148          317 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4149          317 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
    4150          317 :         progress_vals[2] = newidx->indexId;
    4151          317 :         progress_vals[3] = newidx->amId;
    4152          317 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4153              : 
    4154              :         /* Perform concurrent build of new index */
    4155          317 :         index_concurrently_build(newidx->tableId, newidx->indexId);
    4156              : 
    4157          313 :         PopActiveSnapshot();
    4158          313 :         CommitTransactionCommand();
    4159              :     }
    4160              : 
    4161          251 :     StartTransactionCommand();
    4162              : 
    4163              :     /*
    4164              :      * Because we don't take a snapshot or Xid in this transaction, there's no
    4165              :      * need to set the PROC_IN_SAFE_IC flag here.
    4166              :      */
    4167              : 
    4168              :     /*
    4169              :      * Phase 3 of REINDEX CONCURRENTLY
    4170              :      *
    4171              :      * During this phase the old indexes catch up with any new tuples that
    4172              :      * were created during the previous phase.  See "phase 3" in DefineIndex()
    4173              :      * for more details.
    4174              :      */
    4175              : 
    4176          251 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4177              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    4178          251 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4179          251 :     CommitTransactionCommand();
    4180              : 
    4181          564 :     foreach(lc, newIndexIds)
    4182              :     {
    4183          313 :         ReindexIndexInfo *newidx = lfirst(lc);
    4184              :         TransactionId limitXmin;
    4185              :         Snapshot    snapshot;
    4186              : 
    4187          313 :         StartTransactionCommand();
    4188              : 
    4189              :         /*
    4190              :          * Check for user-requested abort.  This is inside a transaction so as
    4191              :          * xact.c does not issue a useless WARNING, and ensures that
    4192              :          * session-level locks are cleaned up on abort.
    4193              :          */
    4194          313 :         CHECK_FOR_INTERRUPTS();
    4195              : 
    4196              :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4197          313 :         if (newidx->safe)
    4198          282 :             set_indexsafe_procflags();
    4199              : 
    4200              :         /*
    4201              :          * Take the "reference snapshot" that will be used by validate_index()
    4202              :          * to filter candidate tuples.
    4203              :          */
    4204          313 :         snapshot = RegisterSnapshot(GetTransactionSnapshot());
    4205          313 :         PushActiveSnapshot(snapshot);
    4206              : 
    4207              :         /*
    4208              :          * Update progress for the index to build, with the correct parent
    4209              :          * table involved.
    4210              :          */
    4211          313 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4212          313 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4213          313 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
    4214          313 :         progress_vals[2] = newidx->indexId;
    4215          313 :         progress_vals[3] = newidx->amId;
    4216          313 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4217              : 
    4218          313 :         validate_index(newidx->tableId, newidx->indexId, snapshot);
    4219              : 
    4220              :         /*
    4221              :          * We can now do away with our active snapshot, we still need to save
    4222              :          * the xmin limit to wait for older snapshots.
    4223              :          */
    4224          313 :         limitXmin = snapshot->xmin;
    4225              : 
    4226          313 :         PopActiveSnapshot();
    4227          313 :         UnregisterSnapshot(snapshot);
    4228              : 
    4229              :         /*
    4230              :          * To ensure no deadlocks, we must commit and start yet another
    4231              :          * transaction, and do our wait before any snapshot has been taken in
    4232              :          * it.
    4233              :          */
    4234          313 :         CommitTransactionCommand();
    4235          313 :         StartTransactionCommand();
    4236              : 
    4237              :         /*
    4238              :          * The index is now valid in the sense that it contains all currently
    4239              :          * interesting tuples.  But since it might not contain tuples deleted
    4240              :          * just before the reference snap was taken, we have to wait out any
    4241              :          * transactions that might have older snapshots.
    4242              :          *
    4243              :          * Because we don't take a snapshot or Xid in this transaction,
    4244              :          * there's no need to set the PROC_IN_SAFE_IC flag here.
    4245              :          */
    4246          313 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4247              :                                      PROGRESS_CREATEIDX_PHASE_WAIT_3);
    4248          313 :         WaitForOlderSnapshots(limitXmin, true);
    4249              : 
    4250          313 :         CommitTransactionCommand();
    4251              :     }
    4252              : 
    4253              :     /*
    4254              :      * Phase 4 of REINDEX CONCURRENTLY
    4255              :      *
    4256              :      * Now that the new indexes have been validated, swap each new index with
    4257              :      * its corresponding old index.
    4258              :      *
    4259              :      * We mark the new indexes as valid and the old indexes as not valid at
    4260              :      * the same time to make sure we only get constraint violations from the
    4261              :      * indexes with the correct names.
    4262              :      */
    4263              : 
    4264          251 :     INJECTION_POINT("reindex-relation-concurrently-before-swap", NULL);
    4265          251 :     StartTransactionCommand();
    4266              : 
    4267              :     /*
    4268              :      * Because this transaction only does catalog manipulations and doesn't do
    4269              :      * any index operations, we can set the PROC_IN_SAFE_IC flag here
    4270              :      * unconditionally.
    4271              :      */
    4272          251 :     set_indexsafe_procflags();
    4273              : 
    4274          564 :     forboth(lc, indexIds, lc2, newIndexIds)
    4275              :     {
    4276          313 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4277          313 :         ReindexIndexInfo *newidx = lfirst(lc2);
    4278              :         char       *oldName;
    4279              : 
    4280              :         /*
    4281              :          * Check for user-requested abort.  This is inside a transaction so as
    4282              :          * xact.c does not issue a useless WARNING, and ensures that
    4283              :          * session-level locks are cleaned up on abort.
    4284              :          */
    4285          313 :         CHECK_FOR_INTERRUPTS();
    4286              : 
    4287              :         /* Choose a relation name for old index */
    4288          313 :         oldName = ChooseRelationName(get_rel_name(oldidx->indexId),
    4289              :                                      NULL,
    4290              :                                      "ccold",
    4291              :                                      get_rel_namespace(oldidx->tableId),
    4292              :                                      false);
    4293              : 
    4294              :         /*
    4295              :          * Swapping the indexes might involve TOAST table access, so ensure we
    4296              :          * have a valid snapshot.
    4297              :          */
    4298          313 :         PushActiveSnapshot(GetTransactionSnapshot());
    4299              : 
    4300              :         /*
    4301              :          * Swap old index with the new one.  This also marks the new one as
    4302              :          * valid and the old one as not valid.
    4303              :          */
    4304          313 :         index_concurrently_swap(newidx->indexId, oldidx->indexId, oldName);
    4305              : 
    4306          313 :         PopActiveSnapshot();
    4307              : 
    4308              :         /*
    4309              :          * Invalidate the relcache for the table, so that after this commit
    4310              :          * all sessions will refresh any cached plans that might reference the
    4311              :          * index.
    4312              :          */
    4313          313 :         CacheInvalidateRelcacheByRelid(oldidx->tableId);
    4314              : 
    4315              :         /*
    4316              :          * CCI here so that subsequent iterations see the oldName in the
    4317              :          * catalog and can choose a nonconflicting name for their oldName.
    4318              :          * Otherwise, this could lead to conflicts if a table has two indexes
    4319              :          * whose names are equal for the first NAMEDATALEN-minus-a-few
    4320              :          * characters.
    4321              :          */
    4322          313 :         CommandCounterIncrement();
    4323              :     }
    4324              : 
    4325              :     /* Commit this transaction and make index swaps visible */
    4326          251 :     CommitTransactionCommand();
    4327          251 :     StartTransactionCommand();
    4328              : 
    4329              :     /*
    4330              :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4331              :      * real need for that, because we only acquire an Xid after the wait is
    4332              :      * done, and that lasts for a very short period.
    4333              :      */
    4334              : 
    4335              :     /*
    4336              :      * Phase 5 of REINDEX CONCURRENTLY
    4337              :      *
    4338              :      * Mark the old indexes as dead.  First we must wait until no running
    4339              :      * transaction could be using the index for a query.  See also
    4340              :      * index_drop() for more details.
    4341              :      */
    4342              : 
    4343          251 :     INJECTION_POINT("reindex-relation-concurrently-before-set-dead", NULL);
    4344          251 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4345              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    4346          251 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4347              : 
    4348          564 :     foreach(lc, indexIds)
    4349              :     {
    4350          313 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4351              : 
    4352              :         /*
    4353              :          * Check for user-requested abort.  This is inside a transaction so as
    4354              :          * xact.c does not issue a useless WARNING, and ensures that
    4355              :          * session-level locks are cleaned up on abort.
    4356              :          */
    4357          313 :         CHECK_FOR_INTERRUPTS();
    4358              : 
    4359              :         /*
    4360              :          * Updating pg_index might involve TOAST table access, so ensure we
    4361              :          * have a valid snapshot.
    4362              :          */
    4363          313 :         PushActiveSnapshot(GetTransactionSnapshot());
    4364              : 
    4365          313 :         index_concurrently_set_dead(oldidx->tableId, oldidx->indexId);
    4366              : 
    4367          313 :         PopActiveSnapshot();
    4368              :     }
    4369              : 
    4370              :     /* Commit this transaction to make the updates visible. */
    4371          251 :     CommitTransactionCommand();
    4372          251 :     StartTransactionCommand();
    4373              : 
    4374              :     /*
    4375              :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4376              :      * real need for that, because we only acquire an Xid after the wait is
    4377              :      * done, and that lasts for a very short period.
    4378              :      */
    4379              : 
    4380              :     /*
    4381              :      * Phase 6 of REINDEX CONCURRENTLY
    4382              :      *
    4383              :      * Drop the old indexes.
    4384              :      */
    4385              : 
    4386          251 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4387              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_5);
    4388          251 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4389              : 
    4390          251 :     PushActiveSnapshot(GetTransactionSnapshot());
    4391              : 
    4392              :     {
    4393          251 :         ObjectAddresses *objects = new_object_addresses();
    4394              : 
    4395          564 :         foreach(lc, indexIds)
    4396              :         {
    4397          313 :             ReindexIndexInfo *idx = lfirst(lc);
    4398              :             ObjectAddress object;
    4399              : 
    4400          313 :             object.classId = RelationRelationId;
    4401          313 :             object.objectId = idx->indexId;
    4402          313 :             object.objectSubId = 0;
    4403              : 
    4404          313 :             add_exact_object_address(&object, objects);
    4405              :         }
    4406              : 
    4407              :         /*
    4408              :          * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
    4409              :          * right lock level.
    4410              :          */
    4411          251 :         performMultipleDeletions(objects, DROP_RESTRICT,
    4412              :                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
    4413              :     }
    4414              : 
    4415          251 :     PopActiveSnapshot();
    4416          251 :     CommitTransactionCommand();
    4417              : 
    4418              :     /*
    4419              :      * Finally, release the session-level lock on the table.
    4420              :      */
    4421         1179 :     foreach(lc, relationLocks)
    4422              :     {
    4423          928 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4424              : 
    4425          928 :         UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4426              :     }
    4427              : 
    4428              :     /* Start a new transaction to finish process properly */
    4429          251 :     StartTransactionCommand();
    4430              : 
    4431              :     /* Log what we did */
    4432          251 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    4433              :     {
    4434            2 :         if (relkind == RELKIND_INDEX)
    4435            0 :             ereport(INFO,
    4436              :                     (errmsg("index \"%s.%s\" was reindexed",
    4437              :                             relationNamespace, relationName),
    4438              :                      errdetail("%s.",
    4439              :                                pg_rusage_show(&ru0))));
    4440              :         else
    4441              :         {
    4442            6 :             foreach(lc, newIndexIds)
    4443              :             {
    4444            4 :                 ReindexIndexInfo *idx = lfirst(lc);
    4445            4 :                 Oid         indOid = idx->indexId;
    4446              : 
    4447            4 :                 ereport(INFO,
    4448              :                         (errmsg("index \"%s.%s\" was reindexed",
    4449              :                                 get_namespace_name(get_rel_namespace(indOid)),
    4450              :                                 get_rel_name(indOid))));
    4451              :                 /* Don't show rusage here, since it's not per index. */
    4452              :             }
    4453              : 
    4454            2 :             ereport(INFO,
    4455              :                     (errmsg("table \"%s.%s\" was reindexed",
    4456              :                             relationNamespace, relationName),
    4457              :                      errdetail("%s.",
    4458              :                                pg_rusage_show(&ru0))));
    4459              :         }
    4460              :     }
    4461              : 
    4462          251 :     MemoryContextDelete(private_context);
    4463              : 
    4464          251 :     pgstat_progress_end_command();
    4465              : 
    4466          251 :     return true;
    4467              : }
    4468              : 
    4469              : /*
    4470              :  * Insert or delete an appropriate pg_inherits tuple to make the given index
    4471              :  * be a partition of the indicated parent index.
    4472              :  *
    4473              :  * This also corrects the pg_depend information for the affected index.
    4474              :  */
    4475              : void
    4476          674 : IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
    4477              : {
    4478              :     Relation    pg_inherits;
    4479              :     ScanKeyData key[2];
    4480              :     SysScanDesc scan;
    4481          674 :     Oid         partRelid = RelationGetRelid(partitionIdx);
    4482              :     HeapTuple   tuple;
    4483              :     bool        fix_dependencies;
    4484              : 
    4485              :     /* Make sure this is an index */
    4486              :     Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
    4487              :            partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
    4488              : 
    4489              :     /*
    4490              :      * Scan pg_inherits for rows linking our index to some parent.
    4491              :      */
    4492          674 :     pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
    4493          674 :     ScanKeyInit(&key[0],
    4494              :                 Anum_pg_inherits_inhrelid,
    4495              :                 BTEqualStrategyNumber, F_OIDEQ,
    4496              :                 ObjectIdGetDatum(partRelid));
    4497          674 :     ScanKeyInit(&key[1],
    4498              :                 Anum_pg_inherits_inhseqno,
    4499              :                 BTEqualStrategyNumber, F_INT4EQ,
    4500              :                 Int32GetDatum(1));
    4501          674 :     scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
    4502              :                               NULL, 2, key);
    4503          674 :     tuple = systable_getnext(scan);
    4504              : 
    4505          674 :     if (!HeapTupleIsValid(tuple))
    4506              :     {
    4507          386 :         if (parentOid == InvalidOid)
    4508              :         {
    4509              :             /*
    4510              :              * No pg_inherits row, and no parent wanted: nothing to do in this
    4511              :              * case.
    4512              :              */
    4513            0 :             fix_dependencies = false;
    4514              :         }
    4515              :         else
    4516              :         {
    4517          386 :             StoreSingleInheritance(partRelid, parentOid, 1);
    4518          386 :             fix_dependencies = true;
    4519              :         }
    4520              :     }
    4521              :     else
    4522              :     {
    4523          288 :         Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
    4524              : 
    4525          288 :         if (parentOid == InvalidOid)
    4526              :         {
    4527              :             /*
    4528              :              * There exists a pg_inherits row, which we want to clear; do so.
    4529              :              */
    4530          288 :             CatalogTupleDelete(pg_inherits, &tuple->t_self);
    4531          288 :             fix_dependencies = true;
    4532              :         }
    4533              :         else
    4534              :         {
    4535              :             /*
    4536              :              * A pg_inherits row exists.  If it's the same we want, then we're
    4537              :              * good; if it differs, that amounts to a corrupt catalog and
    4538              :              * should not happen.
    4539              :              */
    4540            0 :             if (inhForm->inhparent != parentOid)
    4541              :             {
    4542              :                 /* unexpected: we should not get called in this case */
    4543            0 :                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
    4544              :                      inhForm->inhrelid, inhForm->inhparent);
    4545              :             }
    4546              : 
    4547              :             /* already in the right state */
    4548            0 :             fix_dependencies = false;
    4549              :         }
    4550              :     }
    4551              : 
    4552              :     /* done with pg_inherits */
    4553          674 :     systable_endscan(scan);
    4554          674 :     relation_close(pg_inherits, RowExclusiveLock);
    4555              : 
    4556              :     /* set relhassubclass if an index partition has been added to the parent */
    4557          674 :     if (OidIsValid(parentOid))
    4558              :     {
    4559          386 :         LockRelationOid(parentOid, ShareUpdateExclusiveLock);
    4560          386 :         SetRelationHasSubclass(parentOid, true);
    4561              :     }
    4562              : 
    4563              :     /* set relispartition correctly on the partition */
    4564          674 :     update_relispartition(partRelid, OidIsValid(parentOid));
    4565              : 
    4566          674 :     if (fix_dependencies)
    4567              :     {
    4568              :         /*
    4569              :          * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
    4570              :          * dependencies on the parent index and the table; if removing a
    4571              :          * parent, delete PARTITION dependencies.
    4572              :          */
    4573          674 :         if (OidIsValid(parentOid))
    4574              :         {
    4575              :             ObjectAddress partIdx;
    4576              :             ObjectAddress parentIdx;
    4577              :             ObjectAddress partitionTbl;
    4578              : 
    4579          386 :             ObjectAddressSet(partIdx, RelationRelationId, partRelid);
    4580          386 :             ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
    4581          386 :             ObjectAddressSet(partitionTbl, RelationRelationId,
    4582              :                              partitionIdx->rd_index->indrelid);
    4583          386 :             recordDependencyOn(&partIdx, &parentIdx,
    4584              :                                DEPENDENCY_PARTITION_PRI);
    4585          386 :             recordDependencyOn(&partIdx, &partitionTbl,
    4586              :                                DEPENDENCY_PARTITION_SEC);
    4587              :         }
    4588              :         else
    4589              :         {
    4590          288 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4591              :                                             RelationRelationId,
    4592              :                                             DEPENDENCY_PARTITION_PRI);
    4593          288 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4594              :                                             RelationRelationId,
    4595              :                                             DEPENDENCY_PARTITION_SEC);
    4596              :         }
    4597              : 
    4598              :         /* make our updates visible */
    4599          674 :         CommandCounterIncrement();
    4600              :     }
    4601          674 : }
    4602              : 
    4603              : /*
    4604              :  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
    4605              :  * given index to the given value.
    4606              :  */
    4607              : static void
    4608          674 : update_relispartition(Oid relationId, bool newval)
    4609              : {
    4610              :     HeapTuple   tup;
    4611              :     Relation    classRel;
    4612              :     ItemPointerData otid;
    4613              : 
    4614          674 :     classRel = table_open(RelationRelationId, RowExclusiveLock);
    4615          674 :     tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
    4616          674 :     if (!HeapTupleIsValid(tup))
    4617            0 :         elog(ERROR, "cache lookup failed for relation %u", relationId);
    4618          674 :     otid = tup->t_self;
    4619              :     Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    4620          674 :     ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
    4621          674 :     CatalogTupleUpdate(classRel, &otid, tup);
    4622          674 :     UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
    4623          674 :     heap_freetuple(tup);
    4624          674 :     table_close(classRel, RowExclusiveLock);
    4625          674 : }
    4626              : 
    4627              : /*
    4628              :  * Set the PROC_IN_SAFE_IC flag in MyProc->statusFlags.
    4629              :  *
    4630              :  * When doing concurrent index builds, we can set this flag
    4631              :  * to tell other processes concurrently running CREATE
    4632              :  * INDEX CONCURRENTLY or REINDEX CONCURRENTLY to ignore us when
    4633              :  * doing their waits for concurrent snapshots.  On one hand it
    4634              :  * avoids pointlessly waiting for a process that's not interesting
    4635              :  * anyway; but more importantly it avoids deadlocks in some cases.
    4636              :  *
    4637              :  * This can be done safely only for indexes that don't execute any
    4638              :  * expressions that could access other tables, so index must not be
    4639              :  * expressional nor partial.  Caller is responsible for only calling
    4640              :  * this routine when that assumption holds true.
    4641              :  *
    4642              :  * (The flag is reset automatically at transaction end, so it must be
    4643              :  * set for each transaction.)
    4644              :  */
    4645              : static inline void
    4646         1034 : set_indexsafe_procflags(void)
    4647              : {
    4648              :     /*
    4649              :      * This should only be called before installing xid or xmin in MyProc;
    4650              :      * otherwise, concurrent processes could see an Xmin that moves backwards.
    4651              :      */
    4652              :     Assert(MyProc->xid == InvalidTransactionId &&
    4653              :            MyProc->xmin == InvalidTransactionId);
    4654              : 
    4655         1034 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    4656         1034 :     MyProc->statusFlags |= PROC_IN_SAFE_IC;
    4657         1034 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
    4658         1034 :     LWLockRelease(ProcArrayLock);
    4659         1034 : }
        

Generated by: LCOV version 2.0-1