LCOV - code coverage report
Current view: top level - src/backend/commands - indexcmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.6 % 1343 1244
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 26 26
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * indexcmds.c
       4              :  *    POSTGRES define and remove index code.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/commands/indexcmds.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/amapi.h"
      19              : #include "access/gist.h"
      20              : #include "access/heapam.h"
      21              : #include "access/htup_details.h"
      22              : #include "access/reloptions.h"
      23              : #include "access/sysattr.h"
      24              : #include "access/tableam.h"
      25              : #include "access/xact.h"
      26              : #include "catalog/catalog.h"
      27              : #include "catalog/index.h"
      28              : #include "catalog/indexing.h"
      29              : #include "catalog/namespace.h"
      30              : #include "catalog/pg_am.h"
      31              : #include "catalog/pg_authid.h"
      32              : #include "catalog/pg_collation.h"
      33              : #include "catalog/pg_constraint.h"
      34              : #include "catalog/pg_database.h"
      35              : #include "catalog/pg_inherits.h"
      36              : #include "catalog/pg_namespace.h"
      37              : #include "catalog/pg_opclass.h"
      38              : #include "catalog/pg_tablespace.h"
      39              : #include "catalog/pg_type.h"
      40              : #include "commands/comment.h"
      41              : #include "commands/defrem.h"
      42              : #include "commands/event_trigger.h"
      43              : #include "commands/progress.h"
      44              : #include "commands/tablecmds.h"
      45              : #include "commands/tablespace.h"
      46              : #include "mb/pg_wchar.h"
      47              : #include "miscadmin.h"
      48              : #include "nodes/makefuncs.h"
      49              : #include "nodes/nodeFuncs.h"
      50              : #include "optimizer/optimizer.h"
      51              : #include "parser/parse_coerce.h"
      52              : #include "parser/parse_oper.h"
      53              : #include "parser/parse_utilcmd.h"
      54              : #include "partitioning/partdesc.h"
      55              : #include "pgstat.h"
      56              : #include "rewrite/rewriteManip.h"
      57              : #include "storage/lmgr.h"
      58              : #include "storage/proc.h"
      59              : #include "storage/procarray.h"
      60              : #include "utils/acl.h"
      61              : #include "utils/builtins.h"
      62              : #include "utils/fmgroids.h"
      63              : #include "utils/guc.h"
      64              : #include "utils/injection_point.h"
      65              : #include "utils/inval.h"
      66              : #include "utils/lsyscache.h"
      67              : #include "utils/memutils.h"
      68              : #include "utils/partcache.h"
      69              : #include "utils/pg_rusage.h"
      70              : #include "utils/regproc.h"
      71              : #include "utils/snapmgr.h"
      72              : #include "utils/syscache.h"
      73              : 
      74              : 
      75              : /* non-export function prototypes */
      76              : static bool CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts);
      77              : static void CheckPredicate(Expr *predicate);
      78              : static void ComputeIndexAttrs(ParseState *pstate,
      79              :                               IndexInfo *indexInfo,
      80              :                               Oid *typeOids,
      81              :                               Oid *collationOids,
      82              :                               Oid *opclassOids,
      83              :                               Datum *opclassOptions,
      84              :                               int16 *colOptions,
      85              :                               const List *attList,
      86              :                               const List *exclusionOpNames,
      87              :                               Oid relId,
      88              :                               const char *accessMethodName,
      89              :                               Oid accessMethodId,
      90              :                               bool amcanorder,
      91              :                               bool isconstraint,
      92              :                               bool iswithoutoverlaps,
      93              :                               Oid ddl_userid,
      94              :                               int ddl_sec_context,
      95              :                               int *ddl_save_nestlevel);
      96              : static char *ChooseIndexName(const char *tabname, Oid namespaceId,
      97              :                              const List *colnames, const List *exclusionOpNames,
      98              :                              bool primary, bool isconstraint);
      99              : static char *ChooseIndexNameAddition(const List *colnames);
     100              : static List *ChooseIndexColumnNames(const List *indexElems);
     101              : static void ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params,
     102              :                          bool isTopLevel);
     103              : static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
     104              :                                             Oid relId, Oid oldRelId, void *arg);
     105              : static Oid  ReindexTable(const ReindexStmt *stmt, const ReindexParams *params,
     106              :                          bool isTopLevel);
     107              : static void ReindexMultipleTables(const ReindexStmt *stmt,
     108              :                                   const ReindexParams *params);
     109              : static void reindex_error_callback(void *arg);
     110              : static void ReindexPartitions(const ReindexStmt *stmt, Oid relid,
     111              :                               const ReindexParams *params, bool isTopLevel);
     112              : static void ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids,
     113              :                                     const ReindexParams *params);
     114              : static bool ReindexRelationConcurrently(const ReindexStmt *stmt,
     115              :                                         Oid relationOid,
     116              :                                         const ReindexParams *params);
     117              : static void update_relispartition(Oid relationId, bool newval);
     118              : static inline void set_indexsafe_procflags(void);
     119              : 
     120              : /*
     121              :  * callback argument type for RangeVarCallbackForReindexIndex()
     122              :  */
     123              : struct ReindexIndexCallbackState
     124              : {
     125              :     ReindexParams params;       /* options from statement */
     126              :     Oid         locked_table_oid;   /* tracks previously locked table */
     127              : };
     128              : 
     129              : /*
     130              :  * callback arguments for reindex_error_callback()
     131              :  */
     132              : typedef struct ReindexErrorInfo
     133              : {
     134              :     char       *relname;
     135              :     char       *relnamespace;
     136              :     char        relkind;
     137              : } ReindexErrorInfo;
     138              : 
     139              : /*
     140              :  * CheckIndexCompatible
     141              :  *      Determine whether an existing index definition is compatible with a
     142              :  *      prospective index definition, such that the existing index storage
     143              :  *      could become the storage of the new index, avoiding a rebuild.
     144              :  *
     145              :  * 'oldId': the OID of the existing index
     146              :  * 'accessMethodName': name of the AM to use.
     147              :  * 'attributeList': a list of IndexElem specifying columns and expressions
     148              :  *      to index on.
     149              :  * 'exclusionOpNames': list of names of exclusion-constraint operators,
     150              :  *      or NIL if not an exclusion constraint.
     151              :  * 'isWithoutOverlaps': true iff this index has a WITHOUT OVERLAPS clause.
     152              :  *
     153              :  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
     154              :  * any indexes that depended on a changing column from their pg_get_indexdef
     155              :  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
     156              :  * DefineIndex.  We assume that the old and new indexes have the same number
     157              :  * of columns and that if one has an expression column or predicate, both do.
     158              :  * Errors arising from the attribute list still apply.
     159              :  *
     160              :  * Most column type changes that can skip a table rewrite do not invalidate
     161              :  * indexes.  We acknowledge this when all operator classes, collations and
     162              :  * exclusion operators match.  Though we could further permit intra-opfamily
     163              :  * changes for btree and hash indexes, that adds subtle complexity with no
     164              :  * concrete benefit for core types. Note, that INCLUDE columns aren't
     165              :  * checked by this function, for them it's enough that table rewrite is
     166              :  * skipped.
     167              :  *
     168              :  * When a comparison or exclusion operator has a polymorphic input type, the
     169              :  * actual input types must also match.  This defends against the possibility
     170              :  * that operators could vary behavior in response to get_fn_expr_argtype().
     171              :  * At present, this hazard is theoretical: check_exclusion_constraint() and
     172              :  * all core index access methods decline to set fn_expr for such calls.
     173              :  *
     174              :  * We do not yet implement a test to verify compatibility of expression
     175              :  * columns or predicates, so assume any such index is incompatible.
     176              :  */
     177              : bool
     178           52 : CheckIndexCompatible(Oid oldId,
     179              :                      const char *accessMethodName,
     180              :                      const List *attributeList,
     181              :                      const List *exclusionOpNames,
     182              :                      bool isWithoutOverlaps)
     183              : {
     184              :     bool        isconstraint;
     185              :     Oid        *typeIds;
     186              :     Oid        *collationIds;
     187              :     Oid        *opclassIds;
     188              :     Datum      *opclassOptions;
     189              :     Oid         accessMethodId;
     190              :     Oid         relationId;
     191              :     HeapTuple   tuple;
     192              :     Form_pg_index indexForm;
     193              :     Form_pg_am  accessMethodForm;
     194              :     const IndexAmRoutine *amRoutine;
     195              :     bool        amcanorder;
     196              :     bool        amsummarizing;
     197              :     int16      *coloptions;
     198              :     IndexInfo  *indexInfo;
     199              :     int         numberOfAttributes;
     200              :     int         old_natts;
     201           52 :     bool        ret = true;
     202              :     oidvector  *old_indclass;
     203              :     oidvector  *old_indcollation;
     204              :     Relation    irel;
     205              :     int         i;
     206              :     Datum       d;
     207              : 
     208              :     /* Caller should already have the relation locked in some way. */
     209           52 :     relationId = IndexGetRelation(oldId, false);
     210              : 
     211              :     /*
     212              :      * We can pretend isconstraint = false unconditionally.  It only serves to
     213              :      * decide the text of an error message that should never happen for us.
     214              :      */
     215           52 :     isconstraint = false;
     216              : 
     217           52 :     numberOfAttributes = list_length(attributeList);
     218              :     Assert(numberOfAttributes > 0);
     219              :     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
     220              : 
     221              :     /* look up the access method */
     222           52 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     223           52 :     if (!HeapTupleIsValid(tuple))
     224            0 :         ereport(ERROR,
     225              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     226              :                  errmsg("access method \"%s\" does not exist",
     227              :                         accessMethodName)));
     228           52 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     229           52 :     accessMethodId = accessMethodForm->oid;
     230           52 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     231           52 :     ReleaseSysCache(tuple);
     232              : 
     233           52 :     amcanorder = amRoutine->amcanorder;
     234           52 :     amsummarizing = amRoutine->amsummarizing;
     235              : 
     236              :     /*
     237              :      * Compute the operator classes, collations, and exclusion operators for
     238              :      * the new index, so we can test whether it's compatible with the existing
     239              :      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
     240              :      * DefineIndex would have failed later.  Our attributeList contains only
     241              :      * key attributes, thus we're filling ii_NumIndexAttrs and
     242              :      * ii_NumIndexKeyAttrs with same value.
     243              :      */
     244           52 :     indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
     245              :                               accessMethodId, NIL, NIL, false, false,
     246              :                               false, false, amsummarizing, isWithoutOverlaps);
     247           52 :     typeIds = palloc_array(Oid, numberOfAttributes);
     248           52 :     collationIds = palloc_array(Oid, numberOfAttributes);
     249           52 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     250           52 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     251           52 :     coloptions = palloc_array(int16, numberOfAttributes);
     252           52 :     ComputeIndexAttrs(NULL, indexInfo,
     253              :                       typeIds, collationIds, opclassIds, opclassOptions,
     254              :                       coloptions, attributeList,
     255              :                       exclusionOpNames, relationId,
     256              :                       accessMethodName, accessMethodId,
     257              :                       amcanorder, isconstraint, isWithoutOverlaps, InvalidOid,
     258              :                       0, NULL);
     259              : 
     260              :     /* Get the soon-obsolete pg_index tuple. */
     261           52 :     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
     262           52 :     if (!HeapTupleIsValid(tuple))
     263            0 :         elog(ERROR, "cache lookup failed for index %u", oldId);
     264           52 :     indexForm = (Form_pg_index) GETSTRUCT(tuple);
     265              : 
     266              :     /*
     267              :      * We don't assess expressions or predicates; assume incompatibility.
     268              :      * Also, if the index is invalid for any reason, treat it as incompatible.
     269              :      */
     270          104 :     if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
     271           52 :           heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
     272           52 :           indexForm->indisvalid))
     273              :     {
     274            0 :         ReleaseSysCache(tuple);
     275            0 :         return false;
     276              :     }
     277              : 
     278              :     /* Any change in operator class or collation breaks compatibility. */
     279           52 :     old_natts = indexForm->indnkeyatts;
     280              :     Assert(old_natts == numberOfAttributes);
     281              : 
     282           52 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indcollation);
     283           52 :     old_indcollation = (oidvector *) DatumGetPointer(d);
     284              : 
     285           52 :     d = SysCacheGetAttrNotNull(INDEXRELID, tuple, Anum_pg_index_indclass);
     286           52 :     old_indclass = (oidvector *) DatumGetPointer(d);
     287              : 
     288          104 :     ret = (memcmp(old_indclass->values, opclassIds, old_natts * sizeof(Oid)) == 0 &&
     289           52 :            memcmp(old_indcollation->values, collationIds, old_natts * sizeof(Oid)) == 0);
     290              : 
     291           52 :     ReleaseSysCache(tuple);
     292              : 
     293           52 :     if (!ret)
     294            0 :         return false;
     295              : 
     296              :     /* For polymorphic opcintype, column type changes break compatibility. */
     297           52 :     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
     298          107 :     for (i = 0; i < old_natts; i++)
     299              :     {
     300           55 :         if (IsPolymorphicType(get_opclass_input_type(opclassIds[i])) &&
     301            0 :             TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     302              :         {
     303            0 :             ret = false;
     304            0 :             break;
     305              :         }
     306              :     }
     307              : 
     308              :     /* Any change in opclass options break compatibility. */
     309           52 :     if (ret)
     310              :     {
     311           52 :         Datum      *oldOpclassOptions = palloc_array(Datum, old_natts);
     312              : 
     313          107 :         for (i = 0; i < old_natts; i++)
     314           55 :             oldOpclassOptions[i] = get_attoptions(oldId, i + 1);
     315              : 
     316           52 :         ret = CompareOpclassOptions(oldOpclassOptions, opclassOptions, old_natts);
     317              : 
     318           52 :         pfree(oldOpclassOptions);
     319              :     }
     320              : 
     321              :     /* Any change in exclusion operator selections breaks compatibility. */
     322           52 :     if (ret && indexInfo->ii_ExclusionOps != NULL)
     323              :     {
     324              :         Oid        *old_operators,
     325              :                    *old_procs;
     326              :         uint16     *old_strats;
     327              : 
     328            0 :         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
     329            0 :         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
     330              :                      old_natts * sizeof(Oid)) == 0;
     331              : 
     332              :         /* Require an exact input type match for polymorphic operators. */
     333            0 :         if (ret)
     334              :         {
     335            0 :             for (i = 0; i < old_natts && ret; i++)
     336              :             {
     337              :                 Oid         left,
     338              :                             right;
     339              : 
     340            0 :                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
     341            0 :                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
     342            0 :                     TupleDescAttr(irel->rd_att, i)->atttypid != typeIds[i])
     343              :                 {
     344            0 :                     ret = false;
     345            0 :                     break;
     346              :                 }
     347              :             }
     348              :         }
     349              :     }
     350              : 
     351           52 :     index_close(irel, NoLock);
     352           52 :     return ret;
     353              : }
     354              : 
     355              : /*
     356              :  * CompareOpclassOptions
     357              :  *
     358              :  * Compare per-column opclass options which are represented by arrays of text[]
     359              :  * datums.  Both elements of arrays and array themselves can be NULL.
     360              :  */
     361              : static bool
     362           52 : CompareOpclassOptions(const Datum *opts1, const Datum *opts2, int natts)
     363              : {
     364              :     int         i;
     365              :     FmgrInfo    fm;
     366              : 
     367           52 :     if (!opts1 && !opts2)
     368            0 :         return true;
     369              : 
     370           52 :     fmgr_info(F_ARRAY_EQ, &fm);
     371          107 :     for (i = 0; i < natts; i++)
     372              :     {
     373           55 :         Datum       opt1 = opts1 ? opts1[i] : (Datum) 0;
     374           55 :         Datum       opt2 = opts2 ? opts2[i] : (Datum) 0;
     375              : 
     376           55 :         if (opt1 == (Datum) 0)
     377              :         {
     378           54 :             if (opt2 == (Datum) 0)
     379           54 :                 continue;
     380              :             else
     381            0 :                 return false;
     382              :         }
     383            1 :         else if (opt2 == (Datum) 0)
     384            0 :             return false;
     385              : 
     386              :         /*
     387              :          * Compare non-NULL text[] datums.  Use C collation to enforce binary
     388              :          * equivalence of texts, because we don't know anything about the
     389              :          * semantics of opclass options.
     390              :          */
     391            1 :         if (!DatumGetBool(FunctionCall2Coll(&fm, C_COLLATION_OID, opt1, opt2)))
     392            0 :             return false;
     393              :     }
     394              : 
     395           52 :     return true;
     396              : }
     397              : 
     398              : /*
     399              :  * WaitForOlderSnapshots
     400              :  *
     401              :  * Wait for transactions that might have an older snapshot than the given xmin
     402              :  * limit, because it might not contain tuples deleted just before it has
     403              :  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
     404              :  * individually. This is used when building an index concurrently.
     405              :  *
     406              :  * We can exclude any running transactions that have xmin > the xmin given;
     407              :  * their oldest snapshot must be newer than our xmin limit.
     408              :  * We can also exclude any transactions that have xmin = zero, since they
     409              :  * evidently have no live snapshot at all (and any one they might be in
     410              :  * process of taking is certainly newer than ours).  Transactions in other
     411              :  * DBs can be ignored too, since they'll never even be able to see the
     412              :  * index being worked on.
     413              :  *
     414              :  * We can also exclude autovacuum processes and processes running manual
     415              :  * lazy VACUUMs, because they won't be fazed by missing index entries
     416              :  * either.  (Manual ANALYZEs, however, can't be excluded because they
     417              :  * might be within transactions that are going to do arbitrary operations
     418              :  * later.)  Processes running CREATE INDEX CONCURRENTLY or REINDEX CONCURRENTLY
     419              :  * on indexes that are neither expressional nor partial are also safe to
     420              :  * ignore, since we know that those processes won't examine any data
     421              :  * outside the table they're indexing.
     422              :  *
     423              :  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
     424              :  * check for that.
     425              :  *
     426              :  * If a process goes idle-in-transaction with xmin zero, we do not need to
     427              :  * wait for it anymore, per the above argument.  We do not have the
     428              :  * infrastructure right now to stop waiting if that happens, but we can at
     429              :  * least avoid the folly of waiting when it is idle at the time we would
     430              :  * begin to wait.  We do this by repeatedly rechecking the output of
     431              :  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
     432              :  * doesn't show up in the output, we know we can forget about it.
     433              :  */
     434              : void
     435          357 : WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
     436              : {
     437              :     int         n_old_snapshots;
     438              :     int         i;
     439              :     VirtualTransactionId *old_snapshots;
     440              : 
     441          357 :     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
     442              :                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     443              :                                           | PROC_IN_SAFE_IC,
     444              :                                           &n_old_snapshots);
     445          357 :     if (progress)
     446          350 :         pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
     447              : 
     448          496 :     for (i = 0; i < n_old_snapshots; i++)
     449              :     {
     450          139 :         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
     451           29 :             continue;           /* found uninteresting in previous cycle */
     452              : 
     453          110 :         if (i > 0)
     454              :         {
     455              :             /* see if anything's changed ... */
     456              :             VirtualTransactionId *newer_snapshots;
     457              :             int         n_newer_snapshots;
     458              :             int         j;
     459              :             int         k;
     460              : 
     461           46 :             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
     462              :                                                     true, false,
     463              :                                                     PROC_IS_AUTOVACUUM | PROC_IN_VACUUM
     464              :                                                     | PROC_IN_SAFE_IC,
     465              :                                                     &n_newer_snapshots);
     466          188 :             for (j = i; j < n_old_snapshots; j++)
     467              :             {
     468          142 :                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
     469           25 :                     continue;   /* found uninteresting in previous cycle */
     470          495 :                 for (k = 0; k < n_newer_snapshots; k++)
     471              :                 {
     472          445 :                     if (VirtualTransactionIdEquals(old_snapshots[j],
     473              :                                                    newer_snapshots[k]))
     474           67 :                         break;
     475              :                 }
     476          117 :                 if (k >= n_newer_snapshots) /* not there anymore */
     477           50 :                     SetInvalidVirtualTransactionId(old_snapshots[j]);
     478              :             }
     479           46 :             pfree(newer_snapshots);
     480              :         }
     481              : 
     482          110 :         if (VirtualTransactionIdIsValid(old_snapshots[i]))
     483              :         {
     484              :             /* If requested, publish who we're going to wait for. */
     485           89 :             if (progress)
     486              :             {
     487           89 :                 PGPROC     *holder = ProcNumberGetProc(old_snapshots[i].procNumber);
     488              : 
     489           89 :                 if (holder)
     490           89 :                     pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
     491           89 :                                                  holder->pid);
     492              :             }
     493           89 :             VirtualXactLock(old_snapshots[i], true);
     494              :         }
     495              : 
     496          110 :         if (progress)
     497          110 :             pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
     498              :     }
     499          357 : }
     500              : 
     501              : 
     502              : /*
     503              :  * DefineIndex
     504              :  *      Creates a new index.
     505              :  *
     506              :  * This function manages the current userid according to the needs of pg_dump.
     507              :  * Recreating old-database catalog entries in new-database is fine, regardless
     508              :  * of which users would have permission to recreate those entries now.  That's
     509              :  * just preservation of state.  Running opaque expressions, like calling a
     510              :  * function named in a catalog entry or evaluating a pg_node_tree in a catalog
     511              :  * entry, as anyone other than the object owner, is not fine.  To adhere to
     512              :  * those principles and to remain fail-safe, use the table owner userid for
     513              :  * most ACL checks.  Use the original userid for ACL checks reached without
     514              :  * traversing opaque expressions.  (pg_dump can predict such ACL checks from
     515              :  * catalogs.)  Overall, this is a mess.  Future DDL development should
     516              :  * consider offering one DDL command for catalog setup and a separate DDL
     517              :  * command for steps that run opaque expressions.
     518              :  *
     519              :  * 'pstate': ParseState struct (used only for error reports; pass NULL if
     520              :  *      not available)
     521              :  * 'tableId': the OID of the table relation on which the index is to be
     522              :  *      created
     523              :  * 'stmt': IndexStmt describing the properties of the new index.
     524              :  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
     525              :  *      nonzero to specify a preselected OID for the index.
     526              :  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
     527              :  *      of a partitioned index.
     528              :  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
     529              :  *      the child of a constraint (only used when recursing)
     530              :  * 'total_parts': total number of direct and indirect partitions of relation;
     531              :  *      pass -1 if not known or rel is not partitioned.
     532              :  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
     533              :  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
     534              :  *      should be true except when ALTER is deleting/recreating an index.)
     535              :  * 'check_not_in_use': check for table not already in use in current session.
     536              :  *      This should be true unless caller is holding the table open, in which
     537              :  *      case the caller had better have checked it earlier.
     538              :  * 'skip_build': make the catalog entries but don't create the index files
     539              :  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
     540              :  *
     541              :  * Returns the object address of the created index.
     542              :  */
     543              : ObjectAddress
     544        15865 : DefineIndex(ParseState *pstate,
     545              :             Oid tableId,
     546              :             IndexStmt *stmt,
     547              :             Oid indexRelationId,
     548              :             Oid parentIndexId,
     549              :             Oid parentConstraintId,
     550              :             int total_parts,
     551              :             bool is_alter_table,
     552              :             bool check_rights,
     553              :             bool check_not_in_use,
     554              :             bool skip_build,
     555              :             bool quiet)
     556              : {
     557              :     bool        concurrent;
     558              :     char       *indexRelationName;
     559              :     char       *accessMethodName;
     560              :     Oid        *typeIds;
     561              :     Oid        *collationIds;
     562              :     Oid        *opclassIds;
     563              :     Datum      *opclassOptions;
     564              :     Oid         accessMethodId;
     565              :     Oid         namespaceId;
     566              :     Oid         tablespaceId;
     567        15865 :     Oid         createdConstraintId = InvalidOid;
     568              :     List       *indexColNames;
     569              :     List       *allIndexParams;
     570              :     Relation    rel;
     571              :     HeapTuple   tuple;
     572              :     Form_pg_am  accessMethodForm;
     573              :     const IndexAmRoutine *amRoutine;
     574              :     bool        amcanorder;
     575              :     bool        amissummarizing;
     576              :     amoptions_function amoptions;
     577              :     bool        exclusion;
     578              :     bool        partitioned;
     579              :     bool        safe_index;
     580              :     Datum       reloptions;
     581              :     int16      *coloptions;
     582              :     IndexInfo  *indexInfo;
     583              :     bits16      flags;
     584              :     bits16      constr_flags;
     585              :     int         numberOfAttributes;
     586              :     int         numberOfKeyAttributes;
     587              :     TransactionId limitXmin;
     588              :     ObjectAddress address;
     589              :     LockRelId   heaprelid;
     590              :     LOCKTAG     heaplocktag;
     591              :     LOCKMODE    lockmode;
     592              :     Snapshot    snapshot;
     593              :     Oid         root_save_userid;
     594              :     int         root_save_sec_context;
     595              :     int         root_save_nestlevel;
     596              : 
     597        15865 :     root_save_nestlevel = NewGUCNestLevel();
     598              : 
     599        15865 :     RestrictSearchPath();
     600              : 
     601              :     /*
     602              :      * Some callers need us to run with an empty default_tablespace; this is a
     603              :      * necessary hack to be able to reproduce catalog state accurately when
     604              :      * recreating indexes after table-rewriting ALTER TABLE.
     605              :      */
     606        15865 :     if (stmt->reset_default_tblspc)
     607          228 :         (void) set_config_option("default_tablespace", "",
     608              :                                  PGC_USERSET, PGC_S_SESSION,
     609              :                                  GUC_ACTION_SAVE, true, 0, false);
     610              : 
     611              :     /*
     612              :      * Force non-concurrent build on temporary relations, even if CONCURRENTLY
     613              :      * was requested.  Other backends can't access a temporary relation, so
     614              :      * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
     615              :      * is more efficient.  Do this before any use of the concurrent option is
     616              :      * done.
     617              :      */
     618        15865 :     if (stmt->concurrent && get_rel_persistence(tableId) != RELPERSISTENCE_TEMP)
     619          101 :         concurrent = true;
     620              :     else
     621        15764 :         concurrent = false;
     622              : 
     623              :     /*
     624              :      * Start progress report.  If we're building a partition, this was already
     625              :      * done.
     626              :      */
     627        15865 :     if (!OidIsValid(parentIndexId))
     628              :     {
     629        14291 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, tableId);
     630        14291 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
     631              :                                      concurrent ?
     632              :                                      PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
     633              :                                      PROGRESS_CREATEIDX_COMMAND_CREATE);
     634              :     }
     635              : 
     636              :     /*
     637              :      * No index OID to report yet
     638              :      */
     639        15865 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
     640              :                                  InvalidOid);
     641              : 
     642              :     /*
     643              :      * count key attributes in index
     644              :      */
     645        15865 :     numberOfKeyAttributes = list_length(stmt->indexParams);
     646              : 
     647              :     /*
     648              :      * Calculate the new list of index columns including both key columns and
     649              :      * INCLUDE columns.  Later we can determine which of these are key
     650              :      * columns, and which are just part of the INCLUDE list by checking the
     651              :      * list position.  A list item in a position less than ii_NumIndexKeyAttrs
     652              :      * is part of the key columns, and anything equal to and over is part of
     653              :      * the INCLUDE columns.
     654              :      */
     655        15865 :     allIndexParams = list_concat_copy(stmt->indexParams,
     656        15865 :                                       stmt->indexIncludingParams);
     657        15865 :     numberOfAttributes = list_length(allIndexParams);
     658              : 
     659        15865 :     if (numberOfKeyAttributes <= 0)
     660            0 :         ereport(ERROR,
     661              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     662              :                  errmsg("must specify at least one column")));
     663        15865 :     if (numberOfAttributes > INDEX_MAX_KEYS)
     664            0 :         ereport(ERROR,
     665              :                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
     666              :                  errmsg("cannot use more than %d columns in an index",
     667              :                         INDEX_MAX_KEYS)));
     668              : 
     669              :     /*
     670              :      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     671              :      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     672              :      * (but not VACUUM).
     673              :      *
     674              :      * NB: Caller is responsible for making sure that tableId refers to the
     675              :      * relation on which the index should be built; except in bootstrap mode,
     676              :      * this will typically require the caller to have already locked the
     677              :      * relation.  To avoid lock upgrade hazards, that lock should be at least
     678              :      * as strong as the one we take here.
     679              :      *
     680              :      * NB: If the lock strength here ever changes, code that is run by
     681              :      * parallel workers under the control of certain particular ambuild
     682              :      * functions will need to be updated, too.
     683              :      */
     684        15865 :     lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
     685        15865 :     rel = table_open(tableId, lockmode);
     686              : 
     687              :     /*
     688              :      * Switch to the table owner's userid, so that any index functions are run
     689              :      * as that user.  Also lock down security-restricted operations.  We
     690              :      * already arranged to make GUC variable changes local to this command.
     691              :      */
     692        15865 :     GetUserIdAndSecContext(&root_save_userid, &root_save_sec_context);
     693        15865 :     SetUserIdAndSecContext(rel->rd_rel->relowner,
     694              :                            root_save_sec_context | SECURITY_RESTRICTED_OPERATION);
     695              : 
     696        15865 :     namespaceId = RelationGetNamespace(rel);
     697              : 
     698              :     /*
     699              :      * It has exclusion constraint behavior if it's an EXCLUDE constraint or a
     700              :      * temporal PRIMARY KEY/UNIQUE constraint
     701              :      */
     702        15865 :     exclusion = stmt->excludeOpNames || stmt->iswithoutoverlaps;
     703              : 
     704              :     /* Ensure that it makes sense to index this kind of relation */
     705        15865 :     switch (rel->rd_rel->relkind)
     706              :     {
     707        15862 :         case RELKIND_RELATION:
     708              :         case RELKIND_MATVIEW:
     709              :         case RELKIND_PARTITIONED_TABLE:
     710              :             /* OK */
     711        15862 :             break;
     712            3 :         default:
     713            3 :             ereport(ERROR,
     714              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     715              :                      errmsg("cannot create index on relation \"%s\"",
     716              :                             RelationGetRelationName(rel)),
     717              :                      errdetail_relkind_not_supported(rel->rd_rel->relkind)));
     718              :             break;
     719              :     }
     720              : 
     721              :     /*
     722              :      * Establish behavior for partitioned tables, and verify sanity of
     723              :      * parameters.
     724              :      *
     725              :      * We do not build an actual index in this case; we only create a few
     726              :      * catalog entries.  The actual indexes are built by recursing for each
     727              :      * partition.
     728              :      */
     729        15862 :     partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
     730        15862 :     if (partitioned)
     731              :     {
     732              :         /*
     733              :          * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
     734              :          * the error is thrown also for temporary tables.  Seems better to be
     735              :          * consistent, even though we could do it on temporary table because
     736              :          * we're not actually doing it concurrently.
     737              :          */
     738         1128 :         if (stmt->concurrent)
     739            3 :             ereport(ERROR,
     740              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     741              :                      errmsg("cannot create index on partitioned table \"%s\" concurrently",
     742              :                             RelationGetRelationName(rel))));
     743              :     }
     744              : 
     745              :     /*
     746              :      * Don't try to CREATE INDEX on temp tables of other backends.
     747              :      */
     748        15859 :     if (RELATION_IS_OTHER_TEMP(rel))
     749            0 :         ereport(ERROR,
     750              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     751              :                  errmsg("cannot create indexes on temporary tables of other sessions")));
     752              : 
     753              :     /*
     754              :      * Unless our caller vouches for having checked this already, insist that
     755              :      * the table not be in use by our own session, either.  Otherwise we might
     756              :      * fail to make entries in the new index (for instance, if an INSERT or
     757              :      * UPDATE is in progress and has already made its list of target indexes).
     758              :      */
     759        15859 :     if (check_not_in_use)
     760         7538 :         CheckTableNotInUse(rel, "CREATE INDEX");
     761              : 
     762              :     /*
     763              :      * Verify we (still) have CREATE rights in the rel's namespace.
     764              :      * (Presumably we did when the rel was created, but maybe not anymore.)
     765              :      * Skip check if caller doesn't want it.  Also skip check if
     766              :      * bootstrapping, since permissions machinery may not be working yet.
     767              :      */
     768        15856 :     if (check_rights && !IsBootstrapProcessingMode())
     769              :     {
     770              :         AclResult   aclresult;
     771              : 
     772         8163 :         aclresult = object_aclcheck(NamespaceRelationId, namespaceId, root_save_userid,
     773              :                                     ACL_CREATE);
     774         8163 :         if (aclresult != ACLCHECK_OK)
     775            0 :             aclcheck_error(aclresult, OBJECT_SCHEMA,
     776            0 :                            get_namespace_name(namespaceId));
     777              :     }
     778              : 
     779              :     /*
     780              :      * Select tablespace to use.  If not specified, use default tablespace
     781              :      * (which may in turn default to database's default).
     782              :      */
     783        15856 :     if (stmt->tableSpace)
     784              :     {
     785          124 :         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
     786          124 :         if (partitioned && tablespaceId == MyDatabaseTableSpace)
     787            3 :             ereport(ERROR,
     788              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     789              :                      errmsg("cannot specify default tablespace for partitioned relations")));
     790              :     }
     791              :     else
     792              :     {
     793        15732 :         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
     794              :                                             partitioned);
     795              :         /* note InvalidOid is OK in this case */
     796              :     }
     797              : 
     798              :     /* Check tablespace permissions */
     799        15850 :     if (check_rights &&
     800           58 :         OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
     801              :     {
     802              :         AclResult   aclresult;
     803              : 
     804           58 :         aclresult = object_aclcheck(TableSpaceRelationId, tablespaceId, root_save_userid,
     805              :                                     ACL_CREATE);
     806           58 :         if (aclresult != ACLCHECK_OK)
     807            0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     808            0 :                            get_tablespace_name(tablespaceId));
     809              :     }
     810              : 
     811              :     /*
     812              :      * Force shared indexes into the pg_global tablespace.  This is a bit of a
     813              :      * hack but seems simpler than marking them in the BKI commands.  On the
     814              :      * other hand, if it's not shared, don't allow it to be placed there.
     815              :      */
     816        15850 :     if (rel->rd_rel->relisshared)
     817         1071 :         tablespaceId = GLOBALTABLESPACE_OID;
     818        14779 :     else if (tablespaceId == GLOBALTABLESPACE_OID)
     819            0 :         ereport(ERROR,
     820              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     821              :                  errmsg("only shared relations can be placed in pg_global tablespace")));
     822              : 
     823              :     /*
     824              :      * Choose the index column names.
     825              :      */
     826        15850 :     indexColNames = ChooseIndexColumnNames(allIndexParams);
     827              : 
     828              :     /*
     829              :      * Select name for index if caller didn't specify
     830              :      */
     831        15850 :     indexRelationName = stmt->idxname;
     832        15850 :     if (indexRelationName == NULL)
     833         6165 :         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
     834              :                                             namespaceId,
     835              :                                             indexColNames,
     836         6165 :                                             stmt->excludeOpNames,
     837         6165 :                                             stmt->primary,
     838         6165 :                                             stmt->isconstraint);
     839              : 
     840              :     /*
     841              :      * look up the access method, verify it can handle the requested features
     842              :      */
     843        15850 :     accessMethodName = stmt->accessMethod;
     844        15850 :     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     845        15850 :     if (!HeapTupleIsValid(tuple))
     846              :     {
     847              :         /*
     848              :          * Hack to provide more-or-less-transparent updating of old RTREE
     849              :          * indexes to GiST: if RTREE is requested and not found, use GIST.
     850              :          */
     851            3 :         if (strcmp(accessMethodName, "rtree") == 0)
     852              :         {
     853            3 :             ereport(NOTICE,
     854              :                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
     855            3 :             accessMethodName = "gist";
     856            3 :             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
     857              :         }
     858              : 
     859            3 :         if (!HeapTupleIsValid(tuple))
     860            0 :             ereport(ERROR,
     861              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     862              :                      errmsg("access method \"%s\" does not exist",
     863              :                             accessMethodName)));
     864              :     }
     865        15850 :     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
     866        15850 :     accessMethodId = accessMethodForm->oid;
     867        15850 :     amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
     868              : 
     869        15850 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
     870              :                                  accessMethodId);
     871              : 
     872        15850 :     if (stmt->unique && !stmt->iswithoutoverlaps && !amRoutine->amcanunique)
     873            0 :         ereport(ERROR,
     874              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     875              :                  errmsg("access method \"%s\" does not support unique indexes",
     876              :                         accessMethodName)));
     877        15850 :     if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
     878            9 :         ereport(ERROR,
     879              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     880              :                  errmsg("access method \"%s\" does not support included columns",
     881              :                         accessMethodName)));
     882        15841 :     if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
     883            0 :         ereport(ERROR,
     884              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     885              :                  errmsg("access method \"%s\" does not support multicolumn indexes",
     886              :                         accessMethodName)));
     887        15841 :     if (exclusion && amRoutine->amgettuple == NULL)
     888            0 :         ereport(ERROR,
     889              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     890              :                  errmsg("access method \"%s\" does not support exclusion constraints",
     891              :                         accessMethodName)));
     892        15841 :     if (stmt->iswithoutoverlaps && strcmp(accessMethodName, "gist") != 0)
     893            0 :         ereport(ERROR,
     894              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     895              :                  errmsg("access method \"%s\" does not support WITHOUT OVERLAPS constraints",
     896              :                         accessMethodName)));
     897              : 
     898        15841 :     amcanorder = amRoutine->amcanorder;
     899        15841 :     amoptions = amRoutine->amoptions;
     900        15841 :     amissummarizing = amRoutine->amsummarizing;
     901              : 
     902        15841 :     ReleaseSysCache(tuple);
     903              : 
     904              :     /*
     905              :      * Validate predicate, if given
     906              :      */
     907        15841 :     if (stmt->whereClause)
     908          224 :         CheckPredicate((Expr *) stmt->whereClause);
     909              : 
     910              :     /*
     911              :      * Parse AM-specific options, convert to text array form, validate.
     912              :      */
     913        15841 :     reloptions = transformRelOptions((Datum) 0, stmt->options,
     914              :                                      NULL, NULL, false, false);
     915              : 
     916        15838 :     (void) index_reloptions(amoptions, reloptions, true);
     917              : 
     918              :     /*
     919              :      * Prepare arguments for index_create, primarily an IndexInfo structure.
     920              :      * Note that predicates must be in implicit-AND format.  In a concurrent
     921              :      * build, mark it not-ready-for-inserts.
     922              :      */
     923        15806 :     indexInfo = makeIndexInfo(numberOfAttributes,
     924              :                               numberOfKeyAttributes,
     925              :                               accessMethodId,
     926              :                               NIL,  /* expressions, NIL for now */
     927        15806 :                               make_ands_implicit((Expr *) stmt->whereClause),
     928        15806 :                               stmt->unique,
     929        15806 :                               stmt->nulls_not_distinct,
     930              :                               !concurrent,
     931              :                               concurrent,
     932              :                               amissummarizing,
     933        15806 :                               stmt->iswithoutoverlaps);
     934              : 
     935        15806 :     typeIds = palloc_array(Oid, numberOfAttributes);
     936        15806 :     collationIds = palloc_array(Oid, numberOfAttributes);
     937        15806 :     opclassIds = palloc_array(Oid, numberOfAttributes);
     938        15806 :     opclassOptions = palloc_array(Datum, numberOfAttributes);
     939        15806 :     coloptions = palloc_array(int16, numberOfAttributes);
     940        15806 :     ComputeIndexAttrs(pstate,
     941              :                       indexInfo,
     942              :                       typeIds, collationIds, opclassIds, opclassOptions,
     943              :                       coloptions, allIndexParams,
     944        15806 :                       stmt->excludeOpNames, tableId,
     945              :                       accessMethodName, accessMethodId,
     946        15806 :                       amcanorder, stmt->isconstraint, stmt->iswithoutoverlaps,
     947              :                       root_save_userid, root_save_sec_context,
     948              :                       &root_save_nestlevel);
     949              : 
     950              :     /*
     951              :      * Extra checks when creating a PRIMARY KEY index.
     952              :      */
     953        15697 :     if (stmt->primary)
     954         4683 :         index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
     955              : 
     956              :     /*
     957              :      * If this table is partitioned and we're creating a unique index, primary
     958              :      * key, or exclusion constraint, make sure that the partition key is a
     959              :      * subset of the index's columns.  Otherwise it would be possible to
     960              :      * violate uniqueness by putting values that ought to be unique in
     961              :      * different partitions.
     962              :      *
     963              :      * We could lift this limitation if we had global indexes, but those have
     964              :      * their own problems, so this is a useful feature combination.
     965              :      */
     966        15679 :     if (partitioned && (stmt->unique || exclusion))
     967              :     {
     968          665 :         PartitionKey key = RelationGetPartitionKey(rel);
     969              :         const char *constraint_type;
     970              :         int         i;
     971              : 
     972          665 :         if (stmt->primary)
     973          491 :             constraint_type = "PRIMARY KEY";
     974          174 :         else if (stmt->unique)
     975          124 :             constraint_type = "UNIQUE";
     976           50 :         else if (stmt->excludeOpNames)
     977           50 :             constraint_type = "EXCLUDE";
     978              :         else
     979              :         {
     980            0 :             elog(ERROR, "unknown constraint type");
     981              :             constraint_type = NULL; /* keep compiler quiet */
     982              :         }
     983              : 
     984              :         /*
     985              :          * Verify that all the columns in the partition key appear in the
     986              :          * unique key definition, with the same notion of equality.
     987              :          */
     988         1338 :         for (i = 0; i < key->partnatts; i++)
     989              :         {
     990          725 :             bool        found = false;
     991              :             int         eq_strategy;
     992              :             Oid         ptkey_eqop;
     993              :             int         j;
     994              : 
     995              :             /*
     996              :              * Identify the equality operator associated with this partkey
     997              :              * column.  For list and range partitioning, partkeys use btree
     998              :              * operator classes; hash partitioning uses hash operator classes.
     999              :              * (Keep this in sync with ComputePartitionAttrs!)
    1000              :              */
    1001          725 :             if (key->strategy == PARTITION_STRATEGY_HASH)
    1002           34 :                 eq_strategy = HTEqualStrategyNumber;
    1003              :             else
    1004          691 :                 eq_strategy = BTEqualStrategyNumber;
    1005              : 
    1006          725 :             ptkey_eqop = get_opfamily_member(key->partopfamily[i],
    1007          725 :                                              key->partopcintype[i],
    1008          725 :                                              key->partopcintype[i],
    1009              :                                              eq_strategy);
    1010          725 :             if (!OidIsValid(ptkey_eqop))
    1011            0 :                 elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
    1012              :                      eq_strategy, key->partopcintype[i], key->partopcintype[i],
    1013              :                      key->partopfamily[i]);
    1014              : 
    1015              :             /*
    1016              :              * It may be possible to support UNIQUE constraints when partition
    1017              :              * keys are expressions, but is it worth it?  Give up for now.
    1018              :              */
    1019          725 :             if (key->partattrs[i] == 0)
    1020            6 :                 ereport(ERROR,
    1021              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1022              :                          errmsg("unsupported %s constraint with partition key definition",
    1023              :                                 constraint_type),
    1024              :                          errdetail("%s constraints cannot be used when partition keys include expressions.",
    1025              :                                    constraint_type)));
    1026              : 
    1027              :             /* Search the index column(s) for a match */
    1028          819 :             for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
    1029              :             {
    1030          780 :                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
    1031              :                 {
    1032              :                     /*
    1033              :                      * Matched the column, now what about the collation and
    1034              :                      * equality op?
    1035              :                      */
    1036              :                     Oid         idx_opfamily;
    1037              :                     Oid         idx_opcintype;
    1038              : 
    1039          680 :                     if (key->partcollation[i] != collationIds[j])
    1040            0 :                         continue;
    1041              : 
    1042          680 :                     if (get_opclass_opfamily_and_input_type(opclassIds[j],
    1043              :                                                             &idx_opfamily,
    1044              :                                                             &idx_opcintype))
    1045              :                     {
    1046          680 :                         Oid         idx_eqop = InvalidOid;
    1047              : 
    1048          680 :                         if (stmt->unique && !stmt->iswithoutoverlaps)
    1049          606 :                             idx_eqop = get_opfamily_member_for_cmptype(idx_opfamily,
    1050              :                                                                        idx_opcintype,
    1051              :                                                                        idx_opcintype,
    1052              :                                                                        COMPARE_EQ);
    1053           74 :                         else if (exclusion)
    1054           74 :                             idx_eqop = indexInfo->ii_ExclusionOps[j];
    1055              : 
    1056          680 :                         if (!idx_eqop)
    1057            0 :                             ereport(ERROR,
    1058              :                                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1059              :                                     errmsg("could not identify an equality operator for type %s", format_type_be(idx_opcintype)),
    1060              :                                     errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    1061              :                                               get_opfamily_name(idx_opfamily, false), get_am_name(get_opfamily_method(idx_opfamily))));
    1062              : 
    1063          680 :                         if (ptkey_eqop == idx_eqop)
    1064              :                         {
    1065          673 :                             found = true;
    1066          673 :                             break;
    1067              :                         }
    1068            7 :                         else if (exclusion)
    1069              :                         {
    1070              :                             /*
    1071              :                              * We found a match, but it's not an equality
    1072              :                              * operator. Instead of failing below with an
    1073              :                              * error message about a missing column, fail now
    1074              :                              * and explain that the operator is wrong.
    1075              :                              */
    1076            7 :                             Form_pg_attribute att = TupleDescAttr(RelationGetDescr(rel), key->partattrs[i] - 1);
    1077              : 
    1078            7 :                             ereport(ERROR,
    1079              :                                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1080              :                                      errmsg("cannot match partition key to index on column \"%s\" using non-equal operator \"%s\"",
    1081              :                                             NameStr(att->attname),
    1082              :                                             get_opname(indexInfo->ii_ExclusionOps[j]))));
    1083              :                         }
    1084              :                     }
    1085              :                 }
    1086              :             }
    1087              : 
    1088          712 :             if (!found)
    1089              :             {
    1090              :                 Form_pg_attribute att;
    1091              : 
    1092           39 :                 att = TupleDescAttr(RelationGetDescr(rel),
    1093           39 :                                     key->partattrs[i] - 1);
    1094           39 :                 ereport(ERROR,
    1095              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1096              :                 /* translator: %s is UNIQUE, PRIMARY KEY, etc */
    1097              :                          errmsg("%s constraint on partitioned table must include all partitioning columns",
    1098              :                                 constraint_type),
    1099              :                 /* translator: first %s is UNIQUE, PRIMARY KEY, etc */
    1100              :                          errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
    1101              :                                    constraint_type, RelationGetRelationName(rel),
    1102              :                                    NameStr(att->attname))));
    1103              :             }
    1104              :         }
    1105              :     }
    1106              : 
    1107              : 
    1108              :     /*
    1109              :      * We disallow indexes on system columns.  They would not necessarily get
    1110              :      * updated correctly, and they don't seem useful anyway.
    1111              :      *
    1112              :      * Also disallow virtual generated columns in indexes (use expression
    1113              :      * index instead).
    1114              :      */
    1115        37593 :     for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
    1116              :     {
    1117        21978 :         AttrNumber  attno = indexInfo->ii_IndexAttrNumbers[i];
    1118              : 
    1119        21978 :         if (attno < 0)
    1120            3 :             ereport(ERROR,
    1121              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1122              :                      errmsg("index creation on system columns is not supported")));
    1123              : 
    1124              : 
    1125        21975 :         if (TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1126            9 :             ereport(ERROR,
    1127              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1128              :                     stmt->primary ?
    1129              :                     errmsg("primary keys on virtual generated columns are not supported") :
    1130              :                     stmt->isconstraint ?
    1131              :                     errmsg("unique constraints on virtual generated columns are not supported") :
    1132              :                     errmsg("indexes on virtual generated columns are not supported"));
    1133              :     }
    1134              : 
    1135              :     /*
    1136              :      * Also check for system and generated columns used in expressions or
    1137              :      * predicates.
    1138              :      */
    1139        15615 :     if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
    1140              :     {
    1141          638 :         Bitmapset  *indexattrs = NULL;
    1142              :         int         j;
    1143              : 
    1144          638 :         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
    1145          638 :         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
    1146              : 
    1147         4460 :         for (int i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
    1148              :         {
    1149         3828 :             if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
    1150              :                               indexattrs))
    1151            6 :                 ereport(ERROR,
    1152              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1153              :                          errmsg("index creation on system columns is not supported")));
    1154              :         }
    1155              : 
    1156              :         /*
    1157              :          * XXX Virtual generated columns in index expressions or predicates
    1158              :          * could be supported, but it needs support in
    1159              :          * RelationGetIndexExpressions() and RelationGetIndexPredicate().
    1160              :          */
    1161          632 :         j = -1;
    1162         1371 :         while ((j = bms_next_member(indexattrs, j)) >= 0)
    1163              :         {
    1164          739 :             AttrNumber  attno = j + FirstLowInvalidHeapAttributeNumber;
    1165              : 
    1166          739 :             if (TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
    1167            0 :                 ereport(ERROR,
    1168              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1169              :                          stmt->isconstraint ?
    1170              :                          errmsg("unique constraints on virtual generated columns are not supported") :
    1171              :                          errmsg("indexes on virtual generated columns are not supported")));
    1172              :         }
    1173              :     }
    1174              : 
    1175              :     /* Is index safe for others to ignore?  See set_indexsafe_procflags() */
    1176        30755 :     safe_index = indexInfo->ii_Expressions == NIL &&
    1177        15146 :         indexInfo->ii_Predicate == NIL;
    1178              : 
    1179              :     /*
    1180              :      * Report index creation if appropriate (delay this till after most of the
    1181              :      * error checks)
    1182              :      */
    1183        15609 :     if (stmt->isconstraint && !quiet)
    1184              :     {
    1185              :         const char *constraint_type;
    1186              : 
    1187         5150 :         if (stmt->primary)
    1188         4581 :             constraint_type = "PRIMARY KEY";
    1189          569 :         else if (stmt->unique)
    1190          481 :             constraint_type = "UNIQUE";
    1191           88 :         else if (stmt->excludeOpNames)
    1192           88 :             constraint_type = "EXCLUDE";
    1193              :         else
    1194              :         {
    1195            0 :             elog(ERROR, "unknown constraint type");
    1196              :             constraint_type = NULL; /* keep compiler quiet */
    1197              :         }
    1198              : 
    1199         5150 :         ereport(DEBUG1,
    1200              :                 (errmsg_internal("%s %s will create implicit index \"%s\" for table \"%s\"",
    1201              :                                  is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
    1202              :                                  constraint_type,
    1203              :                                  indexRelationName, RelationGetRelationName(rel))));
    1204              :     }
    1205              : 
    1206              :     /*
    1207              :      * A valid stmt->oldNumber implies that we already have a built form of
    1208              :      * the index.  The caller should also decline any index build.
    1209              :      */
    1210              :     Assert(!RelFileNumberIsValid(stmt->oldNumber) || (skip_build && !concurrent));
    1211              : 
    1212              :     /*
    1213              :      * Make the catalog entries for the index, including constraints. This
    1214              :      * step also actually builds the index, except if caller requested not to
    1215              :      * or in concurrent mode, in which case it'll be done later, or doing a
    1216              :      * partitioned index (because those don't have storage).
    1217              :      */
    1218        15609 :     flags = constr_flags = 0;
    1219        15609 :     if (stmt->isconstraint)
    1220         5270 :         flags |= INDEX_CREATE_ADD_CONSTRAINT;
    1221        15609 :     if (skip_build || concurrent || partitioned)
    1222         7708 :         flags |= INDEX_CREATE_SKIP_BUILD;
    1223        15609 :     if (stmt->if_not_exists)
    1224            9 :         flags |= INDEX_CREATE_IF_NOT_EXISTS;
    1225        15609 :     if (concurrent)
    1226           98 :         flags |= INDEX_CREATE_CONCURRENT;
    1227        15609 :     if (partitioned)
    1228         1067 :         flags |= INDEX_CREATE_PARTITIONED;
    1229        15609 :     if (stmt->primary)
    1230         4644 :         flags |= INDEX_CREATE_IS_PRIMARY;
    1231              : 
    1232              :     /*
    1233              :      * If the table is partitioned, and recursion was declined but partitions
    1234              :      * exist, mark the index as invalid.
    1235              :      */
    1236        15609 :     if (partitioned && stmt->relation && !stmt->relation->inh)
    1237              :     {
    1238          126 :         PartitionDesc pd = RelationGetPartitionDesc(rel, true);
    1239              : 
    1240          126 :         if (pd->nparts != 0)
    1241          116 :             flags |= INDEX_CREATE_INVALID;
    1242              :     }
    1243              : 
    1244        15609 :     if (stmt->deferrable)
    1245           66 :         constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
    1246        15609 :     if (stmt->initdeferred)
    1247           19 :         constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
    1248        15609 :     if (stmt->iswithoutoverlaps)
    1249          316 :         constr_flags |= INDEX_CONSTR_CREATE_WITHOUT_OVERLAPS;
    1250              : 
    1251              :     indexRelationId =
    1252        15609 :         index_create(rel, indexRelationName, indexRelationId, parentIndexId,
    1253              :                      parentConstraintId,
    1254              :                      stmt->oldNumber, indexInfo, indexColNames,
    1255              :                      accessMethodId, tablespaceId,
    1256              :                      collationIds, opclassIds, opclassOptions,
    1257              :                      coloptions, NULL, reloptions,
    1258              :                      flags, constr_flags,
    1259              :                      allowSystemTableMods, !check_rights,
    1260        15609 :                      &createdConstraintId);
    1261              : 
    1262        15493 :     ObjectAddressSet(address, RelationRelationId, indexRelationId);
    1263              : 
    1264        15493 :     if (!OidIsValid(indexRelationId))
    1265              :     {
    1266              :         /*
    1267              :          * Roll back any GUC changes executed by index functions.  Also revert
    1268              :          * to original default_tablespace if we changed it above.
    1269              :          */
    1270            9 :         AtEOXact_GUC(false, root_save_nestlevel);
    1271              : 
    1272              :         /* Restore userid and security context */
    1273            9 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1274              : 
    1275            9 :         table_close(rel, NoLock);
    1276              : 
    1277              :         /* If this is the top-level index, we're done */
    1278            9 :         if (!OidIsValid(parentIndexId))
    1279            9 :             pgstat_progress_end_command();
    1280              : 
    1281            9 :         return address;
    1282              :     }
    1283              : 
    1284              :     /*
    1285              :      * Roll back any GUC changes executed by index functions, and keep
    1286              :      * subsequent changes local to this command.  This is essential if some
    1287              :      * index function changed a behavior-affecting GUC, e.g. search_path.
    1288              :      */
    1289        15484 :     AtEOXact_GUC(false, root_save_nestlevel);
    1290        15484 :     root_save_nestlevel = NewGUCNestLevel();
    1291        15484 :     RestrictSearchPath();
    1292              : 
    1293              :     /* Add any requested comment */
    1294        15484 :     if (stmt->idxcomment != NULL)
    1295           39 :         CreateComments(indexRelationId, RelationRelationId, 0,
    1296           39 :                        stmt->idxcomment);
    1297              : 
    1298        15484 :     if (partitioned)
    1299              :     {
    1300              :         PartitionDesc partdesc;
    1301              : 
    1302              :         /*
    1303              :          * Unless caller specified to skip this step (via ONLY), process each
    1304              :          * partition to make sure they all contain a corresponding index.
    1305              :          *
    1306              :          * If we're called internally (no stmt->relation), recurse always.
    1307              :          */
    1308         1067 :         partdesc = RelationGetPartitionDesc(rel, true);
    1309         1067 :         if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
    1310              :         {
    1311          312 :             int         nparts = partdesc->nparts;
    1312          312 :             Oid        *part_oids = palloc_array(Oid, nparts);
    1313          312 :             bool        invalidate_parent = false;
    1314              :             Relation    parentIndex;
    1315              :             TupleDesc   parentDesc;
    1316              : 
    1317              :             /*
    1318              :              * Report the total number of partitions at the start of the
    1319              :              * command; don't update it when being called recursively.
    1320              :              */
    1321          312 :             if (!OidIsValid(parentIndexId))
    1322              :             {
    1323              :                 /*
    1324              :                  * When called by ProcessUtilitySlow, the number of partitions
    1325              :                  * is passed in as an optimization; but other callers pass -1
    1326              :                  * since they don't have the value handy.  This should count
    1327              :                  * partitions the same way, ie one less than the number of
    1328              :                  * relations find_all_inheritors reports.
    1329              :                  *
    1330              :                  * We assume we needn't ask find_all_inheritors to take locks,
    1331              :                  * because that should have happened already for all callers.
    1332              :                  * Even if it did not, this is safe as long as we don't try to
    1333              :                  * touch the partitions here; the worst consequence would be a
    1334              :                  * bogus progress-reporting total.
    1335              :                  */
    1336          256 :                 if (total_parts < 0)
    1337              :                 {
    1338           64 :                     List       *children = find_all_inheritors(tableId, NoLock, NULL);
    1339              : 
    1340           64 :                     total_parts = list_length(children) - 1;
    1341           64 :                     list_free(children);
    1342              :                 }
    1343              : 
    1344          256 :                 pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
    1345              :                                              total_parts);
    1346              :             }
    1347              : 
    1348              :             /* Make a local copy of partdesc->oids[], just for safety */
    1349          312 :             memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
    1350              : 
    1351              :             /*
    1352              :              * We'll need an IndexInfo describing the parent index.  The one
    1353              :              * built above is almost good enough, but not quite, because (for
    1354              :              * example) its predicate expression if any hasn't been through
    1355              :              * expression preprocessing.  The most reliable way to get an
    1356              :              * IndexInfo that will match those for child indexes is to build
    1357              :              * it the same way, using BuildIndexInfo().
    1358              :              */
    1359          312 :             parentIndex = index_open(indexRelationId, lockmode);
    1360          312 :             indexInfo = BuildIndexInfo(parentIndex);
    1361              : 
    1362          312 :             parentDesc = RelationGetDescr(rel);
    1363              : 
    1364              :             /*
    1365              :              * For each partition, scan all existing indexes; if one matches
    1366              :              * our index definition and is not already attached to some other
    1367              :              * parent index, attach it to the one we just created.
    1368              :              *
    1369              :              * If none matches, build a new index by calling ourselves
    1370              :              * recursively with the same options (except for the index name).
    1371              :              */
    1372          853 :             for (int i = 0; i < nparts; i++)
    1373              :             {
    1374          553 :                 Oid         childRelid = part_oids[i];
    1375              :                 Relation    childrel;
    1376              :                 Oid         child_save_userid;
    1377              :                 int         child_save_sec_context;
    1378              :                 int         child_save_nestlevel;
    1379              :                 List       *childidxs;
    1380              :                 ListCell   *cell;
    1381              :                 AttrMap    *attmap;
    1382          553 :                 bool        found = false;
    1383              : 
    1384          553 :                 childrel = table_open(childRelid, lockmode);
    1385              : 
    1386          553 :                 GetUserIdAndSecContext(&child_save_userid,
    1387              :                                        &child_save_sec_context);
    1388          553 :                 SetUserIdAndSecContext(childrel->rd_rel->relowner,
    1389              :                                        child_save_sec_context | SECURITY_RESTRICTED_OPERATION);
    1390          553 :                 child_save_nestlevel = NewGUCNestLevel();
    1391          553 :                 RestrictSearchPath();
    1392              : 
    1393              :                 /*
    1394              :                  * Don't try to create indexes on foreign tables, though. Skip
    1395              :                  * those if a regular index, or fail if trying to create a
    1396              :                  * constraint index.
    1397              :                  */
    1398          553 :                 if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
    1399              :                 {
    1400            9 :                     if (stmt->unique || stmt->primary)
    1401            6 :                         ereport(ERROR,
    1402              :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1403              :                                  errmsg("cannot create unique index on partitioned table \"%s\"",
    1404              :                                         RelationGetRelationName(rel)),
    1405              :                                  errdetail("Table \"%s\" contains partitions that are foreign tables.",
    1406              :                                            RelationGetRelationName(rel))));
    1407              : 
    1408            3 :                     AtEOXact_GUC(false, child_save_nestlevel);
    1409            3 :                     SetUserIdAndSecContext(child_save_userid,
    1410              :                                            child_save_sec_context);
    1411            3 :                     table_close(childrel, lockmode);
    1412            3 :                     continue;
    1413              :                 }
    1414              : 
    1415          544 :                 childidxs = RelationGetIndexList(childrel);
    1416              :                 attmap =
    1417          544 :                     build_attrmap_by_name(RelationGetDescr(childrel),
    1418              :                                           parentDesc,
    1419              :                                           false);
    1420              : 
    1421          718 :                 foreach(cell, childidxs)
    1422              :                 {
    1423          213 :                     Oid         cldidxid = lfirst_oid(cell);
    1424              :                     Relation    cldidx;
    1425              :                     IndexInfo  *cldIdxInfo;
    1426              : 
    1427              :                     /* this index is already partition of another one */
    1428          213 :                     if (has_superclass(cldidxid))
    1429          162 :                         continue;
    1430              : 
    1431           51 :                     cldidx = index_open(cldidxid, lockmode);
    1432           51 :                     cldIdxInfo = BuildIndexInfo(cldidx);
    1433           51 :                     if (CompareIndexInfo(cldIdxInfo, indexInfo,
    1434           51 :                                          cldidx->rd_indcollation,
    1435           51 :                                          parentIndex->rd_indcollation,
    1436           51 :                                          cldidx->rd_opfamily,
    1437           51 :                                          parentIndex->rd_opfamily,
    1438              :                                          attmap))
    1439              :                     {
    1440           39 :                         Oid         cldConstrOid = InvalidOid;
    1441              : 
    1442              :                         /*
    1443              :                          * Found a match.
    1444              :                          *
    1445              :                          * If this index is being created in the parent
    1446              :                          * because of a constraint, then the child needs to
    1447              :                          * have a constraint also, so look for one.  If there
    1448              :                          * is no such constraint, this index is no good, so
    1449              :                          * keep looking.
    1450              :                          */
    1451           39 :                         if (createdConstraintId != InvalidOid)
    1452              :                         {
    1453              :                             cldConstrOid =
    1454            6 :                                 get_relation_idx_constraint_oid(childRelid,
    1455              :                                                                 cldidxid);
    1456            6 :                             if (cldConstrOid == InvalidOid)
    1457              :                             {
    1458            0 :                                 index_close(cldidx, lockmode);
    1459            0 :                                 continue;
    1460              :                             }
    1461              :                         }
    1462              : 
    1463              :                         /* Attach index to parent and we're done. */
    1464           39 :                         IndexSetParentIndex(cldidx, indexRelationId);
    1465           39 :                         if (createdConstraintId != InvalidOid)
    1466            6 :                             ConstraintSetParentConstraint(cldConstrOid,
    1467              :                                                           createdConstraintId,
    1468              :                                                           childRelid);
    1469              : 
    1470           39 :                         if (!cldidx->rd_index->indisvalid)
    1471            9 :                             invalidate_parent = true;
    1472              : 
    1473           39 :                         found = true;
    1474              : 
    1475              :                         /*
    1476              :                          * Report this partition as processed.  Note that if
    1477              :                          * the partition has children itself, we'd ideally
    1478              :                          * count the children and update the progress report
    1479              :                          * for all of them; but that seems unduly expensive.
    1480              :                          * Instead, the progress report will act like all such
    1481              :                          * indirect children were processed in zero time at
    1482              :                          * the end of the command.
    1483              :                          */
    1484           39 :                         pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1485              : 
    1486              :                         /* keep lock till commit */
    1487           39 :                         index_close(cldidx, NoLock);
    1488           39 :                         break;
    1489              :                     }
    1490              : 
    1491           12 :                     index_close(cldidx, lockmode);
    1492              :                 }
    1493              : 
    1494          544 :                 list_free(childidxs);
    1495          544 :                 AtEOXact_GUC(false, child_save_nestlevel);
    1496          544 :                 SetUserIdAndSecContext(child_save_userid,
    1497              :                                        child_save_sec_context);
    1498          544 :                 table_close(childrel, NoLock);
    1499              : 
    1500              :                 /*
    1501              :                  * If no matching index was found, create our own.
    1502              :                  */
    1503          544 :                 if (!found)
    1504              :                 {
    1505              :                     IndexStmt  *childStmt;
    1506              :                     ObjectAddress childAddr;
    1507              : 
    1508              :                     /*
    1509              :                      * Build an IndexStmt describing the desired child index
    1510              :                      * in the same way that we do during ATTACH PARTITION.
    1511              :                      * Notably, we rely on generateClonedIndexStmt to produce
    1512              :                      * a search-path-independent representation, which the
    1513              :                      * original IndexStmt might not be.
    1514              :                      */
    1515          505 :                     childStmt = generateClonedIndexStmt(NULL,
    1516              :                                                         parentIndex,
    1517              :                                                         attmap,
    1518              :                                                         NULL);
    1519              : 
    1520              :                     /*
    1521              :                      * Recurse as the starting user ID.  Callee will use that
    1522              :                      * for permission checks, then switch again.
    1523              :                      */
    1524              :                     Assert(GetUserId() == child_save_userid);
    1525          505 :                     SetUserIdAndSecContext(root_save_userid,
    1526              :                                            root_save_sec_context);
    1527              :                     childAddr =
    1528          505 :                         DefineIndex(NULL,   /* original pstate not applicable */
    1529              :                                     childRelid, childStmt,
    1530              :                                     InvalidOid, /* no predefined OID */
    1531              :                                     indexRelationId,    /* this is our child */
    1532              :                                     createdConstraintId,
    1533              :                                     -1,
    1534              :                                     is_alter_table, check_rights,
    1535              :                                     check_not_in_use,
    1536              :                                     skip_build, quiet);
    1537          499 :                     SetUserIdAndSecContext(child_save_userid,
    1538              :                                            child_save_sec_context);
    1539              : 
    1540              :                     /*
    1541              :                      * Check if the index just created is valid or not, as it
    1542              :                      * could be possible that it has been switched as invalid
    1543              :                      * when recursing across multiple partition levels.
    1544              :                      */
    1545          499 :                     if (!get_index_isvalid(childAddr.objectId))
    1546            3 :                         invalidate_parent = true;
    1547              :                 }
    1548              : 
    1549          538 :                 free_attrmap(attmap);
    1550              :             }
    1551              : 
    1552          300 :             index_close(parentIndex, lockmode);
    1553              : 
    1554              :             /*
    1555              :              * The pg_index row we inserted for this index was marked
    1556              :              * indisvalid=true.  But if we attached an existing index that is
    1557              :              * invalid, this is incorrect, so update our row to invalid too.
    1558              :              */
    1559          300 :             if (invalidate_parent)
    1560              :             {
    1561           12 :                 Relation    pg_index = table_open(IndexRelationId, RowExclusiveLock);
    1562              :                 HeapTuple   tup,
    1563              :                             newtup;
    1564              : 
    1565           12 :                 tup = SearchSysCache1(INDEXRELID,
    1566              :                                       ObjectIdGetDatum(indexRelationId));
    1567           12 :                 if (!HeapTupleIsValid(tup))
    1568            0 :                     elog(ERROR, "cache lookup failed for index %u",
    1569              :                          indexRelationId);
    1570           12 :                 newtup = heap_copytuple(tup);
    1571           12 :                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
    1572           12 :                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
    1573           12 :                 ReleaseSysCache(tup);
    1574           12 :                 table_close(pg_index, RowExclusiveLock);
    1575           12 :                 heap_freetuple(newtup);
    1576              : 
    1577              :                 /*
    1578              :                  * CCI here to make this update visible, in case this recurses
    1579              :                  * across multiple partition levels.
    1580              :                  */
    1581           12 :                 CommandCounterIncrement();
    1582              :             }
    1583              :         }
    1584              : 
    1585              :         /*
    1586              :          * Indexes on partitioned tables are not themselves built, so we're
    1587              :          * done here.
    1588              :          */
    1589         1055 :         AtEOXact_GUC(false, root_save_nestlevel);
    1590         1055 :         SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1591         1055 :         table_close(rel, NoLock);
    1592         1055 :         if (!OidIsValid(parentIndexId))
    1593          904 :             pgstat_progress_end_command();
    1594              :         else
    1595              :         {
    1596              :             /* Update progress for an intermediate partitioned index itself */
    1597          151 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1598              :         }
    1599              : 
    1600         1055 :         return address;
    1601              :     }
    1602              : 
    1603        14417 :     AtEOXact_GUC(false, root_save_nestlevel);
    1604        14417 :     SetUserIdAndSecContext(root_save_userid, root_save_sec_context);
    1605              : 
    1606        14417 :     if (!concurrent)
    1607              :     {
    1608              :         /* Close the heap and we're done, in the non-concurrent case */
    1609        14325 :         table_close(rel, NoLock);
    1610              : 
    1611              :         /*
    1612              :          * If this is the top-level index, the command is done overall;
    1613              :          * otherwise, increment progress to report one child index is done.
    1614              :          */
    1615        14325 :         if (!OidIsValid(parentIndexId))
    1616        12920 :             pgstat_progress_end_command();
    1617              :         else
    1618         1405 :             pgstat_progress_incr_param(PROGRESS_CREATEIDX_PARTITIONS_DONE, 1);
    1619              : 
    1620        14325 :         return address;
    1621              :     }
    1622              : 
    1623              :     /* save lockrelid and locktag for below, then close rel */
    1624           92 :     heaprelid = rel->rd_lockInfo.lockRelId;
    1625           92 :     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
    1626           92 :     table_close(rel, NoLock);
    1627              : 
    1628              :     /*
    1629              :      * For a concurrent build, it's important to make the catalog entries
    1630              :      * visible to other transactions before we start to build the index. That
    1631              :      * will prevent them from making incompatible HOT updates.  The new index
    1632              :      * will be marked not indisready and not indisvalid, so that no one else
    1633              :      * tries to either insert into it or use it for queries.
    1634              :      *
    1635              :      * We must commit our current transaction so that the index becomes
    1636              :      * visible; then start another.  Note that all the data structures we just
    1637              :      * built are lost in the commit.  The only data we keep past here are the
    1638              :      * relation IDs.
    1639              :      *
    1640              :      * Before committing, get a session-level lock on the table, to ensure
    1641              :      * that neither it nor the index can be dropped before we finish. This
    1642              :      * cannot block, even if someone else is waiting for access, because we
    1643              :      * already have the same lock within our transaction.
    1644              :      *
    1645              :      * Note: we don't currently bother with a session lock on the index,
    1646              :      * because there are no operations that could change its state while we
    1647              :      * hold lock on the parent table.  This might need to change later.
    1648              :      */
    1649           92 :     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1650              : 
    1651           92 :     PopActiveSnapshot();
    1652           92 :     CommitTransactionCommand();
    1653           92 :     StartTransactionCommand();
    1654              : 
    1655              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1656           92 :     if (safe_index)
    1657           69 :         set_indexsafe_procflags();
    1658              : 
    1659              :     /*
    1660              :      * The index is now visible, so we can report the OID.  While on it,
    1661              :      * include the report for the beginning of phase 2.
    1662              :      */
    1663              :     {
    1664           92 :         const int   progress_cols[] = {
    1665              :             PROGRESS_CREATEIDX_INDEX_OID,
    1666              :             PROGRESS_CREATEIDX_PHASE
    1667              :         };
    1668           92 :         const int64 progress_vals[] = {
    1669              :             indexRelationId,
    1670              :             PROGRESS_CREATEIDX_PHASE_WAIT_1
    1671              :         };
    1672              : 
    1673           92 :         pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
    1674              :     }
    1675              : 
    1676              :     /*
    1677              :      * Phase 2 of concurrent index build (see comments for validate_index()
    1678              :      * for an overview of how this works)
    1679              :      *
    1680              :      * Now we must wait until no running transaction could have the table open
    1681              :      * with the old list of indexes.  Use ShareLock to consider running
    1682              :      * transactions that hold locks that permit writing to the table.  Note we
    1683              :      * do not need to worry about xacts that open the table for writing after
    1684              :      * this point; they will see the new index when they open it.
    1685              :      *
    1686              :      * Note: the reason we use actual lock acquisition here, rather than just
    1687              :      * checking the ProcArray and sleeping, is that deadlock is possible if
    1688              :      * one of the transactions in question is blocked trying to acquire an
    1689              :      * exclusive lock on our table.  The lock code will detect deadlock and
    1690              :      * error out properly.
    1691              :      */
    1692           92 :     WaitForLockers(heaplocktag, ShareLock, true);
    1693              : 
    1694              :     /*
    1695              :      * At this moment we are sure that there are no transactions with the
    1696              :      * table open for write that don't have this new index in their list of
    1697              :      * indexes.  We have waited out all the existing transactions and any new
    1698              :      * transaction will have the new index in its list, but the index is still
    1699              :      * marked as "not-ready-for-inserts".  The index is consulted while
    1700              :      * deciding HOT-safety though.  This arrangement ensures that no new HOT
    1701              :      * chains can be created where the new tuple and the old tuple in the
    1702              :      * chain have different index keys.
    1703              :      *
    1704              :      * We now take a new snapshot, and build the index using all tuples that
    1705              :      * are visible in this snapshot.  We can be sure that any HOT updates to
    1706              :      * these tuples will be compatible with the index, since any updates made
    1707              :      * by transactions that didn't know about the index are now committed or
    1708              :      * rolled back.  Thus, each visible tuple is either the end of its
    1709              :      * HOT-chain or the extension of the chain is HOT-safe for this index.
    1710              :      */
    1711              : 
    1712              :     /* Set ActiveSnapshot since functions in the indexes may need it */
    1713           92 :     PushActiveSnapshot(GetTransactionSnapshot());
    1714              : 
    1715              :     /* Perform concurrent build of index */
    1716           92 :     index_concurrently_build(tableId, indexRelationId);
    1717              : 
    1718              :     /* we can do away with our snapshot */
    1719           83 :     PopActiveSnapshot();
    1720              : 
    1721              :     /*
    1722              :      * Commit this transaction to make the indisready update visible.
    1723              :      */
    1724           83 :     CommitTransactionCommand();
    1725           83 :     StartTransactionCommand();
    1726              : 
    1727              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1728           83 :     if (safe_index)
    1729           63 :         set_indexsafe_procflags();
    1730              : 
    1731              :     /*
    1732              :      * Phase 3 of concurrent index build
    1733              :      *
    1734              :      * We once again wait until no transaction can have the table open with
    1735              :      * the index marked as read-only for updates.
    1736              :      */
    1737           83 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1738              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    1739           83 :     WaitForLockers(heaplocktag, ShareLock, true);
    1740              : 
    1741              :     /*
    1742              :      * Now take the "reference snapshot" that will be used by validate_index()
    1743              :      * to filter candidate tuples.  Beware!  There might still be snapshots in
    1744              :      * use that treat some transaction as in-progress that our reference
    1745              :      * snapshot treats as committed.  If such a recently-committed transaction
    1746              :      * deleted tuples in the table, we will not include them in the index; yet
    1747              :      * those transactions which see the deleting one as still-in-progress will
    1748              :      * expect such tuples to be there once we mark the index as valid.
    1749              :      *
    1750              :      * We solve this by waiting for all endangered transactions to exit before
    1751              :      * we mark the index as valid.
    1752              :      *
    1753              :      * We also set ActiveSnapshot to this snap, since functions in indexes may
    1754              :      * need a snapshot.
    1755              :      */
    1756           83 :     snapshot = RegisterSnapshot(GetTransactionSnapshot());
    1757           83 :     PushActiveSnapshot(snapshot);
    1758              : 
    1759              :     /*
    1760              :      * Scan the index and the heap, insert any missing index entries.
    1761              :      */
    1762           83 :     validate_index(tableId, indexRelationId, snapshot);
    1763              : 
    1764              :     /*
    1765              :      * Drop the reference snapshot.  We must do this before waiting out other
    1766              :      * snapshot holders, else we will deadlock against other processes also
    1767              :      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
    1768              :      * they must wait for.  But first, save the snapshot's xmin to use as
    1769              :      * limitXmin for GetCurrentVirtualXIDs().
    1770              :      */
    1771           83 :     limitXmin = snapshot->xmin;
    1772              : 
    1773           83 :     PopActiveSnapshot();
    1774           83 :     UnregisterSnapshot(snapshot);
    1775              : 
    1776              :     /*
    1777              :      * The snapshot subsystem could still contain registered snapshots that
    1778              :      * are holding back our process's advertised xmin; in particular, if
    1779              :      * default_transaction_isolation = serializable, there is a transaction
    1780              :      * snapshot that is still active.  The CatalogSnapshot is likewise a
    1781              :      * hazard.  To ensure no deadlocks, we must commit and start yet another
    1782              :      * transaction, and do our wait before any snapshot has been taken in it.
    1783              :      */
    1784           83 :     CommitTransactionCommand();
    1785           83 :     StartTransactionCommand();
    1786              : 
    1787              :     /* Tell concurrent index builds to ignore us, if index qualifies */
    1788           83 :     if (safe_index)
    1789           63 :         set_indexsafe_procflags();
    1790              : 
    1791              :     /* We should now definitely not be advertising any xmin. */
    1792              :     Assert(MyProc->xmin == InvalidTransactionId);
    1793              : 
    1794              :     /*
    1795              :      * The index is now valid in the sense that it contains all currently
    1796              :      * interesting tuples.  But since it might not contain tuples deleted just
    1797              :      * before the reference snap was taken, we have to wait out any
    1798              :      * transactions that might have older snapshots.
    1799              :      */
    1800           83 :     INJECTION_POINT("define-index-before-set-valid", NULL);
    1801           83 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    1802              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_3);
    1803           83 :     WaitForOlderSnapshots(limitXmin, true);
    1804              : 
    1805              :     /*
    1806              :      * Updating pg_index might involve TOAST table access, so ensure we have a
    1807              :      * valid snapshot.
    1808              :      */
    1809           83 :     PushActiveSnapshot(GetTransactionSnapshot());
    1810              : 
    1811              :     /*
    1812              :      * Index can now be marked valid -- update its pg_index entry
    1813              :      */
    1814           83 :     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
    1815              : 
    1816           83 :     PopActiveSnapshot();
    1817              : 
    1818              :     /*
    1819              :      * The pg_index update will cause backends (including this one) to update
    1820              :      * relcache entries for the index itself, but we should also send a
    1821              :      * relcache inval on the parent table to force replanning of cached plans.
    1822              :      * Otherwise existing sessions might fail to use the new index where it
    1823              :      * would be useful.  (Note that our earlier commits did not create reasons
    1824              :      * to replan; so relcache flush on the index itself was sufficient.)
    1825              :      */
    1826           83 :     CacheInvalidateRelcacheByRelid(heaprelid.relId);
    1827              : 
    1828              :     /*
    1829              :      * Last thing to do is release the session-level lock on the parent table.
    1830              :      */
    1831           83 :     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
    1832              : 
    1833           83 :     pgstat_progress_end_command();
    1834              : 
    1835           83 :     return address;
    1836              : }
    1837              : 
    1838              : 
    1839              : /*
    1840              :  * CheckPredicate
    1841              :  *      Checks that the given partial-index predicate is valid.
    1842              :  *
    1843              :  * This used to also constrain the form of the predicate to forms that
    1844              :  * indxpath.c could do something with.  However, that seems overly
    1845              :  * restrictive.  One useful application of partial indexes is to apply
    1846              :  * a UNIQUE constraint across a subset of a table, and in that scenario
    1847              :  * any evaluable predicate will work.  So accept any predicate here
    1848              :  * (except ones requiring a plan), and let indxpath.c fend for itself.
    1849              :  */
    1850              : static void
    1851          224 : CheckPredicate(Expr *predicate)
    1852              : {
    1853              :     /*
    1854              :      * transformExpr() should have already rejected subqueries, aggregates,
    1855              :      * and window functions, based on the EXPR_KIND_ for a predicate.
    1856              :      */
    1857              : 
    1858              :     /*
    1859              :      * A predicate using mutable functions is probably wrong, for the same
    1860              :      * reasons that we don't allow an index expression to use one.
    1861              :      */
    1862          224 :     if (contain_mutable_functions_after_planning(predicate))
    1863            0 :         ereport(ERROR,
    1864              :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1865              :                  errmsg("functions in index predicate must be marked IMMUTABLE")));
    1866          224 : }
    1867              : 
    1868              : /*
    1869              :  * Compute per-index-column information, including indexed column numbers
    1870              :  * or index expressions, opclasses and their options. Note, all output vectors
    1871              :  * should be allocated for all columns, including "including" ones.
    1872              :  *
    1873              :  * If the caller switched to the table owner, ddl_userid is the role for ACL
    1874              :  * checks reached without traversing opaque expressions.  Otherwise, it's
    1875              :  * InvalidOid, and other ddl_* arguments are undefined.
    1876              :  */
    1877              : static void
    1878        15858 : ComputeIndexAttrs(ParseState *pstate,
    1879              :                   IndexInfo *indexInfo,
    1880              :                   Oid *typeOids,
    1881              :                   Oid *collationOids,
    1882              :                   Oid *opclassOids,
    1883              :                   Datum *opclassOptions,
    1884              :                   int16 *colOptions,
    1885              :                   const List *attList,  /* list of IndexElem's */
    1886              :                   const List *exclusionOpNames,
    1887              :                   Oid relId,
    1888              :                   const char *accessMethodName,
    1889              :                   Oid accessMethodId,
    1890              :                   bool amcanorder,
    1891              :                   bool isconstraint,
    1892              :                   bool iswithoutoverlaps,
    1893              :                   Oid ddl_userid,
    1894              :                   int ddl_sec_context,
    1895              :                   int *ddl_save_nestlevel)
    1896              : {
    1897              :     ListCell   *nextExclOp;
    1898              :     ListCell   *lc;
    1899              :     int         attn;
    1900        15858 :     int         nkeycols = indexInfo->ii_NumIndexKeyAttrs;
    1901              :     Oid         save_userid;
    1902              :     int         save_sec_context;
    1903              : 
    1904              :     /* Allocate space for exclusion operator info, if needed */
    1905        15858 :     if (exclusionOpNames)
    1906              :     {
    1907              :         Assert(list_length(exclusionOpNames) == nkeycols);
    1908          159 :         indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1909          159 :         indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1910          159 :         indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1911          159 :         nextExclOp = list_head(exclusionOpNames);
    1912              :     }
    1913              :     else
    1914        15699 :         nextExclOp = NULL;
    1915              : 
    1916              :     /*
    1917              :      * If this is a WITHOUT OVERLAPS constraint, we need space for exclusion
    1918              :      * ops, but we don't need to parse anything, so we can let nextExclOp be
    1919              :      * NULL. Note that for partitions/inheriting/LIKE, exclusionOpNames will
    1920              :      * be set, so we already allocated above.
    1921              :      */
    1922        15858 :     if (iswithoutoverlaps)
    1923              :     {
    1924          316 :         if (exclusionOpNames == NIL)
    1925              :         {
    1926          277 :             indexInfo->ii_ExclusionOps = palloc_array(Oid, nkeycols);
    1927          277 :             indexInfo->ii_ExclusionProcs = palloc_array(Oid, nkeycols);
    1928          277 :             indexInfo->ii_ExclusionStrats = palloc_array(uint16, nkeycols);
    1929              :         }
    1930          316 :         nextExclOp = NULL;
    1931              :     }
    1932              : 
    1933        15858 :     if (OidIsValid(ddl_userid))
    1934        15806 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    1935              : 
    1936              :     /*
    1937              :      * process attributeList
    1938              :      */
    1939        15858 :     attn = 0;
    1940        37971 :     foreach(lc, attList)
    1941              :     {
    1942        22222 :         IndexElem  *attribute = (IndexElem *) lfirst(lc);
    1943              :         Oid         atttype;
    1944              :         Oid         attcollation;
    1945              : 
    1946              :         /*
    1947              :          * Process the column-or-expression to be indexed.
    1948              :          */
    1949        22222 :         if (attribute->name != NULL)
    1950              :         {
    1951              :             /* Simple index attribute */
    1952              :             HeapTuple   atttuple;
    1953              :             Form_pg_attribute attform;
    1954              : 
    1955              :             Assert(attribute->expr == NULL);
    1956        21652 :             atttuple = SearchSysCacheAttName(relId, attribute->name);
    1957        21652 :             if (!HeapTupleIsValid(atttuple))
    1958              :             {
    1959              :                 /* difference in error message spellings is historical */
    1960           15 :                 if (isconstraint)
    1961            9 :                     ereport(ERROR,
    1962              :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1963              :                              errmsg("column \"%s\" named in key does not exist",
    1964              :                                     attribute->name),
    1965              :                              parser_errposition(pstate, attribute->location)));
    1966              :                 else
    1967            6 :                     ereport(ERROR,
    1968              :                             (errcode(ERRCODE_UNDEFINED_COLUMN),
    1969              :                              errmsg("column \"%s\" does not exist",
    1970              :                                     attribute->name),
    1971              :                              parser_errposition(pstate, attribute->location)));
    1972              :             }
    1973        21637 :             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
    1974        21637 :             indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
    1975        21637 :             atttype = attform->atttypid;
    1976        21637 :             attcollation = attform->attcollation;
    1977        21637 :             ReleaseSysCache(atttuple);
    1978              :         }
    1979              :         else
    1980              :         {
    1981              :             /* Index expression */
    1982          570 :             Node       *expr = attribute->expr;
    1983              : 
    1984              :             Assert(expr != NULL);
    1985              : 
    1986          570 :             if (attn >= nkeycols)
    1987            0 :                 ereport(ERROR,
    1988              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1989              :                          errmsg("expressions are not supported in included columns"),
    1990              :                          parser_errposition(pstate, attribute->location)));
    1991          570 :             atttype = exprType(expr);
    1992          570 :             attcollation = exprCollation(expr);
    1993              : 
    1994              :             /*
    1995              :              * Strip any top-level COLLATE clause.  This ensures that we treat
    1996              :              * "x COLLATE y" and "(x COLLATE y)" alike.
    1997              :              */
    1998          585 :             while (IsA(expr, CollateExpr))
    1999           15 :                 expr = (Node *) ((CollateExpr *) expr)->arg;
    2000              : 
    2001          570 :             if (IsA(expr, Var) &&
    2002            6 :                 ((Var *) expr)->varattno != InvalidAttrNumber)
    2003              :             {
    2004              :                 /*
    2005              :                  * User wrote "(column)" or "(column COLLATE something)".
    2006              :                  * Treat it like simple attribute anyway.
    2007              :                  */
    2008            6 :                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
    2009              :             }
    2010              :             else
    2011              :             {
    2012          564 :                 indexInfo->ii_IndexAttrNumbers[attn] = 0;    /* marks expression */
    2013          564 :                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
    2014              :                                                     expr);
    2015              : 
    2016              :                 /*
    2017              :                  * transformExpr() should have already rejected subqueries,
    2018              :                  * aggregates, and window functions, based on the EXPR_KIND_
    2019              :                  * for an index expression.
    2020              :                  */
    2021              : 
    2022              :                 /*
    2023              :                  * An expression using mutable functions is probably wrong,
    2024              :                  * since if you aren't going to get the same result for the
    2025              :                  * same data every time, it's not clear what the index entries
    2026              :                  * mean at all.
    2027              :                  */
    2028          564 :                 if (contain_mutable_functions_after_planning((Expr *) expr))
    2029           84 :                     ereport(ERROR,
    2030              :                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2031              :                              errmsg("functions in index expression must be marked IMMUTABLE"),
    2032              :                              parser_errposition(pstate, attribute->location)));
    2033              :             }
    2034              :         }
    2035              : 
    2036        22123 :         typeOids[attn] = atttype;
    2037              : 
    2038              :         /*
    2039              :          * Included columns have no collation, no opclass and no ordering
    2040              :          * options.
    2041              :          */
    2042        22123 :         if (attn >= nkeycols)
    2043              :         {
    2044          322 :             if (attribute->collation)
    2045            0 :                 ereport(ERROR,
    2046              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2047              :                          errmsg("including column does not support a collation"),
    2048              :                          parser_errposition(pstate, attribute->location)));
    2049          322 :             if (attribute->opclass)
    2050            0 :                 ereport(ERROR,
    2051              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2052              :                          errmsg("including column does not support an operator class"),
    2053              :                          parser_errposition(pstate, attribute->location)));
    2054          322 :             if (attribute->ordering != SORTBY_DEFAULT)
    2055            0 :                 ereport(ERROR,
    2056              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2057              :                          errmsg("including column does not support ASC/DESC options"),
    2058              :                          parser_errposition(pstate, attribute->location)));
    2059          322 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2060            0 :                 ereport(ERROR,
    2061              :                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2062              :                          errmsg("including column does not support NULLS FIRST/LAST options"),
    2063              :                          parser_errposition(pstate, attribute->location)));
    2064              : 
    2065          322 :             opclassOids[attn] = InvalidOid;
    2066          322 :             opclassOptions[attn] = (Datum) 0;
    2067          322 :             colOptions[attn] = 0;
    2068          322 :             collationOids[attn] = InvalidOid;
    2069          322 :             attn++;
    2070              : 
    2071          322 :             continue;
    2072              :         }
    2073              : 
    2074              :         /*
    2075              :          * Apply collation override if any.  Use of ddl_userid is necessary
    2076              :          * due to ACL checks therein, and it's safe because collations don't
    2077              :          * contain opaque expressions (or non-opaque expressions).
    2078              :          */
    2079        21801 :         if (attribute->collation)
    2080              :         {
    2081           59 :             if (OidIsValid(ddl_userid))
    2082              :             {
    2083           59 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2084           59 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2085              :             }
    2086           59 :             attcollation = get_collation_oid(attribute->collation, false);
    2087           58 :             if (OidIsValid(ddl_userid))
    2088              :             {
    2089           58 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2090           58 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2091           58 :                 RestrictSearchPath();
    2092              :             }
    2093              :         }
    2094              : 
    2095              :         /*
    2096              :          * Check we have a collation iff it's a collatable type.  The only
    2097              :          * expected failures here are (1) COLLATE applied to a noncollatable
    2098              :          * type, or (2) index expression had an unresolved collation.  But we
    2099              :          * might as well code this to be a complete consistency check.
    2100              :          */
    2101        21800 :         if (type_is_collatable(atttype))
    2102              :         {
    2103         3366 :             if (!OidIsValid(attcollation))
    2104            0 :                 ereport(ERROR,
    2105              :                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
    2106              :                          errmsg("could not determine which collation to use for index expression"),
    2107              :                          errhint("Use the COLLATE clause to set the collation explicitly."),
    2108              :                          parser_errposition(pstate, attribute->location)));
    2109              :         }
    2110              :         else
    2111              :         {
    2112        18434 :             if (OidIsValid(attcollation))
    2113            6 :                 ereport(ERROR,
    2114              :                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    2115              :                          errmsg("collations are not supported by type %s",
    2116              :                                 format_type_be(atttype)),
    2117              :                          parser_errposition(pstate, attribute->location)));
    2118              :         }
    2119              : 
    2120        21794 :         collationOids[attn] = attcollation;
    2121              : 
    2122              :         /*
    2123              :          * Identify the opclass to use.  Use of ddl_userid is necessary due to
    2124              :          * ACL checks therein.  This is safe despite opclasses containing
    2125              :          * opaque expressions (specifically, functions), because only
    2126              :          * superusers can define opclasses.
    2127              :          */
    2128        21794 :         if (OidIsValid(ddl_userid))
    2129              :         {
    2130        21739 :             AtEOXact_GUC(false, *ddl_save_nestlevel);
    2131        21739 :             SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2132              :         }
    2133        21794 :         opclassOids[attn] = ResolveOpClass(attribute->opclass,
    2134              :                                            atttype,
    2135              :                                            accessMethodName,
    2136              :                                            accessMethodId);
    2137        21791 :         if (OidIsValid(ddl_userid))
    2138              :         {
    2139        21736 :             SetUserIdAndSecContext(save_userid, save_sec_context);
    2140        21736 :             *ddl_save_nestlevel = NewGUCNestLevel();
    2141        21736 :             RestrictSearchPath();
    2142              :         }
    2143              : 
    2144              :         /*
    2145              :          * Identify the exclusion operator, if any.
    2146              :          */
    2147        21791 :         if (nextExclOp)
    2148              :         {
    2149          174 :             List       *opname = (List *) lfirst(nextExclOp);
    2150              :             Oid         opid;
    2151              :             Oid         opfamily;
    2152              :             int         strat;
    2153              : 
    2154              :             /*
    2155              :              * Find the operator --- it must accept the column datatype
    2156              :              * without runtime coercion (but binary compatibility is OK).
    2157              :              * Operators contain opaque expressions (specifically, functions).
    2158              :              * compatible_oper_opid() boils down to oper() and
    2159              :              * IsBinaryCoercible().  PostgreSQL would have security problems
    2160              :              * elsewhere if oper() started calling opaque expressions.
    2161              :              */
    2162          174 :             if (OidIsValid(ddl_userid))
    2163              :             {
    2164          174 :                 AtEOXact_GUC(false, *ddl_save_nestlevel);
    2165          174 :                 SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
    2166              :             }
    2167          174 :             opid = compatible_oper_opid(opname, atttype, atttype, false);
    2168          174 :             if (OidIsValid(ddl_userid))
    2169              :             {
    2170          174 :                 SetUserIdAndSecContext(save_userid, save_sec_context);
    2171          174 :                 *ddl_save_nestlevel = NewGUCNestLevel();
    2172          174 :                 RestrictSearchPath();
    2173              :             }
    2174              : 
    2175              :             /*
    2176              :              * Only allow commutative operators to be used in exclusion
    2177              :              * constraints. If X conflicts with Y, but Y does not conflict
    2178              :              * with X, bad things will happen.
    2179              :              */
    2180          174 :             if (get_commutator(opid) != opid)
    2181            0 :                 ereport(ERROR,
    2182              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2183              :                          errmsg("operator %s is not commutative",
    2184              :                                 format_operator(opid)),
    2185              :                          errdetail("Only commutative operators can be used in exclusion constraints."),
    2186              :                          parser_errposition(pstate, attribute->location)));
    2187              : 
    2188              :             /*
    2189              :              * Operator must be a member of the right opfamily, too
    2190              :              */
    2191          174 :             opfamily = get_opclass_family(opclassOids[attn]);
    2192          174 :             strat = get_op_opfamily_strategy(opid, opfamily);
    2193          174 :             if (strat == 0)
    2194            0 :                 ereport(ERROR,
    2195              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    2196              :                          errmsg("operator %s is not a member of operator family \"%s\"",
    2197              :                                 format_operator(opid),
    2198              :                                 get_opfamily_name(opfamily, false)),
    2199              :                          errdetail("The exclusion operator must be related to the index operator class for the constraint."),
    2200              :                          parser_errposition(pstate, attribute->location)));
    2201              : 
    2202          174 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2203          174 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2204          174 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2205          174 :             nextExclOp = lnext(exclusionOpNames, nextExclOp);
    2206              :         }
    2207        21617 :         else if (iswithoutoverlaps)
    2208              :         {
    2209              :             CompareType cmptype;
    2210              :             StrategyNumber strat;
    2211              :             Oid         opid;
    2212              : 
    2213          649 :             if (attn == nkeycols - 1)
    2214          316 :                 cmptype = COMPARE_OVERLAP;
    2215              :             else
    2216          333 :                 cmptype = COMPARE_EQ;
    2217          649 :             GetOperatorFromCompareType(opclassOids[attn], InvalidOid, cmptype, &opid, &strat);
    2218          649 :             indexInfo->ii_ExclusionOps[attn] = opid;
    2219          649 :             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
    2220          649 :             indexInfo->ii_ExclusionStrats[attn] = strat;
    2221              :         }
    2222              : 
    2223              :         /*
    2224              :          * Set up the per-column options (indoption field).  For now, this is
    2225              :          * zero for any un-ordered index, while ordered indexes have DESC and
    2226              :          * NULLS FIRST/LAST options.
    2227              :          */
    2228        21791 :         colOptions[attn] = 0;
    2229        21791 :         if (amcanorder)
    2230              :         {
    2231              :             /* default ordering is ASC */
    2232        19684 :             if (attribute->ordering == SORTBY_DESC)
    2233           21 :                 colOptions[attn] |= INDOPTION_DESC;
    2234              :             /* default null ordering is LAST for ASC, FIRST for DESC */
    2235        19684 :             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
    2236              :             {
    2237        19669 :                 if (attribute->ordering == SORTBY_DESC)
    2238           15 :                     colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2239              :             }
    2240           15 :             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
    2241            6 :                 colOptions[attn] |= INDOPTION_NULLS_FIRST;
    2242              :         }
    2243              :         else
    2244              :         {
    2245              :             /* index AM does not support ordering */
    2246         2107 :             if (attribute->ordering != SORTBY_DEFAULT)
    2247            0 :                 ereport(ERROR,
    2248              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2249              :                          errmsg("access method \"%s\" does not support ASC/DESC options",
    2250              :                                 accessMethodName),
    2251              :                          parser_errposition(pstate, attribute->location)));
    2252         2107 :             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
    2253            0 :                 ereport(ERROR,
    2254              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2255              :                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
    2256              :                                 accessMethodName),
    2257              :                          parser_errposition(pstate, attribute->location)));
    2258              :         }
    2259              : 
    2260              :         /* Set up the per-column opclass options (attoptions field). */
    2261        21791 :         if (attribute->opclassopts)
    2262              :         {
    2263              :             Assert(attn < nkeycols);
    2264              : 
    2265           72 :             opclassOptions[attn] =
    2266           72 :                 transformRelOptions((Datum) 0, attribute->opclassopts,
    2267              :                                     NULL, NULL, false, false);
    2268              :         }
    2269              :         else
    2270        21719 :             opclassOptions[attn] = (Datum) 0;
    2271              : 
    2272        21791 :         attn++;
    2273              :     }
    2274        15749 : }
    2275              : 
    2276              : /*
    2277              :  * Resolve possibly-defaulted operator class specification
    2278              :  *
    2279              :  * Note: This is used to resolve operator class specifications in index and
    2280              :  * partition key definitions.
    2281              :  */
    2282              : Oid
    2283        21863 : ResolveOpClass(const List *opclass, Oid attrType,
    2284              :                const char *accessMethodName, Oid accessMethodId)
    2285              : {
    2286              :     char       *schemaname;
    2287              :     char       *opcname;
    2288              :     HeapTuple   tuple;
    2289              :     Form_pg_opclass opform;
    2290              :     Oid         opClassId,
    2291              :                 opInputType;
    2292              : 
    2293        21863 :     if (opclass == NIL)
    2294              :     {
    2295              :         /* no operator class specified, so find the default */
    2296        10766 :         opClassId = GetDefaultOpClass(attrType, accessMethodId);
    2297        10766 :         if (!OidIsValid(opClassId))
    2298            3 :             ereport(ERROR,
    2299              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2300              :                      errmsg("data type %s has no default operator class for access method \"%s\"",
    2301              :                             format_type_be(attrType), accessMethodName),
    2302              :                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
    2303        10763 :         return opClassId;
    2304              :     }
    2305              : 
    2306              :     /*
    2307              :      * Specific opclass name given, so look up the opclass.
    2308              :      */
    2309              : 
    2310              :     /* deconstruct the name list */
    2311        11097 :     DeconstructQualifiedName(opclass, &schemaname, &opcname);
    2312              : 
    2313        11097 :     if (schemaname)
    2314              :     {
    2315              :         /* Look in specific schema only */
    2316              :         Oid         namespaceId;
    2317              : 
    2318           14 :         namespaceId = LookupExplicitNamespace(schemaname, false);
    2319           14 :         tuple = SearchSysCache3(CLAAMNAMENSP,
    2320              :                                 ObjectIdGetDatum(accessMethodId),
    2321              :                                 PointerGetDatum(opcname),
    2322              :                                 ObjectIdGetDatum(namespaceId));
    2323              :     }
    2324              :     else
    2325              :     {
    2326              :         /* Unqualified opclass name, so search the search path */
    2327        11083 :         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
    2328        11083 :         if (!OidIsValid(opClassId))
    2329            6 :             ereport(ERROR,
    2330              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2331              :                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2332              :                             opcname, accessMethodName)));
    2333        11077 :         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
    2334              :     }
    2335              : 
    2336        11091 :     if (!HeapTupleIsValid(tuple))
    2337            0 :         ereport(ERROR,
    2338              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2339              :                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
    2340              :                         NameListToString(opclass), accessMethodName)));
    2341              : 
    2342              :     /*
    2343              :      * Verify that the index operator class accepts this datatype.  Note we
    2344              :      * will accept binary compatibility.
    2345              :      */
    2346        11091 :     opform = (Form_pg_opclass) GETSTRUCT(tuple);
    2347        11091 :     opClassId = opform->oid;
    2348        11091 :     opInputType = opform->opcintype;
    2349              : 
    2350        11091 :     if (!IsBinaryCoercible(attrType, opInputType))
    2351            0 :         ereport(ERROR,
    2352              :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
    2353              :                  errmsg("operator class \"%s\" does not accept data type %s",
    2354              :                         NameListToString(opclass), format_type_be(attrType))));
    2355              : 
    2356        11091 :     ReleaseSysCache(tuple);
    2357              : 
    2358        11091 :     return opClassId;
    2359              : }
    2360              : 
    2361              : /*
    2362              :  * GetDefaultOpClass
    2363              :  *
    2364              :  * Given the OIDs of a datatype and an access method, find the default
    2365              :  * operator class, if any.  Returns InvalidOid if there is none.
    2366              :  */
    2367              : Oid
    2368        58548 : GetDefaultOpClass(Oid type_id, Oid am_id)
    2369              : {
    2370        58548 :     Oid         result = InvalidOid;
    2371        58548 :     int         nexact = 0;
    2372        58548 :     int         ncompatible = 0;
    2373        58548 :     int         ncompatiblepreferred = 0;
    2374              :     Relation    rel;
    2375              :     ScanKeyData skey[1];
    2376              :     SysScanDesc scan;
    2377              :     HeapTuple   tup;
    2378              :     TYPCATEGORY tcategory;
    2379              : 
    2380              :     /* If it's a domain, look at the base type instead */
    2381        58548 :     type_id = getBaseType(type_id);
    2382              : 
    2383        58548 :     tcategory = TypeCategory(type_id);
    2384              : 
    2385              :     /*
    2386              :      * We scan through all the opclasses available for the access method,
    2387              :      * looking for one that is marked default and matches the target type
    2388              :      * (either exactly or binary-compatibly, but prefer an exact match).
    2389              :      *
    2390              :      * We could find more than one binary-compatible match.  If just one is
    2391              :      * for a preferred type, use that one; otherwise we fail, forcing the user
    2392              :      * to specify which one he wants.  (The preferred-type special case is a
    2393              :      * kluge for varchar: it's binary-compatible to both text and bpchar, so
    2394              :      * we need a tiebreaker.)  If we find more than one exact match, then
    2395              :      * someone put bogus entries in pg_opclass.
    2396              :      */
    2397        58548 :     rel = table_open(OperatorClassRelationId, AccessShareLock);
    2398              : 
    2399        58548 :     ScanKeyInit(&skey[0],
    2400              :                 Anum_pg_opclass_opcmethod,
    2401              :                 BTEqualStrategyNumber, F_OIDEQ,
    2402              :                 ObjectIdGetDatum(am_id));
    2403              : 
    2404        58548 :     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
    2405              :                               NULL, 1, skey);
    2406              : 
    2407      2620692 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    2408              :     {
    2409      2562144 :         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
    2410              : 
    2411              :         /* ignore altogether if not a default opclass */
    2412      2562144 :         if (!opclass->opcdefault)
    2413       369420 :             continue;
    2414      2192724 :         if (opclass->opcintype == type_id)
    2415              :         {
    2416        51955 :             nexact++;
    2417        51955 :             result = opclass->oid;
    2418              :         }
    2419      3209610 :         else if (nexact == 0 &&
    2420      1068841 :                  IsBinaryCoercible(type_id, opclass->opcintype))
    2421              :         {
    2422        12548 :             if (IsPreferredType(tcategory, opclass->opcintype))
    2423              :             {
    2424         1055 :                 ncompatiblepreferred++;
    2425         1055 :                 result = opclass->oid;
    2426              :             }
    2427        11493 :             else if (ncompatiblepreferred == 0)
    2428              :             {
    2429        11493 :                 ncompatible++;
    2430        11493 :                 result = opclass->oid;
    2431              :             }
    2432              :         }
    2433              :     }
    2434              : 
    2435        58548 :     systable_endscan(scan);
    2436              : 
    2437        58548 :     table_close(rel, AccessShareLock);
    2438              : 
    2439              :     /* raise error if pg_opclass contains inconsistent data */
    2440        58548 :     if (nexact > 1)
    2441            0 :         ereport(ERROR,
    2442              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2443              :                  errmsg("there are multiple default operator classes for data type %s",
    2444              :                         format_type_be(type_id))));
    2445              : 
    2446        58548 :     if (nexact == 1 ||
    2447         5539 :         ncompatiblepreferred == 1 ||
    2448         5539 :         (ncompatiblepreferred == 0 && ncompatible == 1))
    2449        57915 :         return result;
    2450              : 
    2451          633 :     return InvalidOid;
    2452              : }
    2453              : 
    2454              : /*
    2455              :  * GetOperatorFromCompareType
    2456              :  *
    2457              :  * opclass - the opclass to use
    2458              :  * rhstype - the type for the right-hand side, or InvalidOid to use the type of the given opclass.
    2459              :  * cmptype - kind of operator to find
    2460              :  * opid - holds the operator we found
    2461              :  * strat - holds the output strategy number
    2462              :  *
    2463              :  * Finds an operator from a CompareType.  This is used for temporal index
    2464              :  * constraints (and other temporal features) to look up equality and overlaps
    2465              :  * operators.  We ask an opclass support function to translate from the
    2466              :  * compare type to the internal strategy numbers.  Raises ERROR on search
    2467              :  * failure.
    2468              :  */
    2469              : void
    2470         1061 : GetOperatorFromCompareType(Oid opclass, Oid rhstype, CompareType cmptype,
    2471              :                            Oid *opid, StrategyNumber *strat)
    2472              : {
    2473              :     Oid         amid;
    2474              :     Oid         opfamily;
    2475              :     Oid         opcintype;
    2476              : 
    2477              :     Assert(cmptype == COMPARE_EQ || cmptype == COMPARE_OVERLAP || cmptype == COMPARE_CONTAINED_BY);
    2478              : 
    2479              :     /*
    2480              :      * Use the opclass to get the opfamily, opcintype, and access method. If
    2481              :      * any of this fails, quit early.
    2482              :      */
    2483         1061 :     if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
    2484            0 :         elog(ERROR, "cache lookup failed for opclass %u", opclass);
    2485              : 
    2486         1061 :     amid = get_opclass_method(opclass);
    2487              : 
    2488              :     /*
    2489              :      * Ask the index AM to translate to its internal stratnum
    2490              :      */
    2491         1061 :     *strat = IndexAmTranslateCompareType(cmptype, amid, opfamily, true);
    2492         1061 :     if (*strat == InvalidStrategy)
    2493            0 :         ereport(ERROR,
    2494              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2495              :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2496              :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2497              :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2498              :                 errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".",
    2499              :                           cmptype, get_opfamily_name(opfamily, false), get_am_name(amid)));
    2500              : 
    2501              :     /*
    2502              :      * We parameterize rhstype so foreign keys can ask for a <@ operator whose
    2503              :      * rhs matches the aggregate function. For example range_agg returns
    2504              :      * anymultirange.
    2505              :      */
    2506         1061 :     if (!OidIsValid(rhstype))
    2507          855 :         rhstype = opcintype;
    2508         1061 :     *opid = get_opfamily_member(opfamily, opcintype, rhstype, *strat);
    2509              : 
    2510         1061 :     if (!OidIsValid(*opid))
    2511            0 :         ereport(ERROR,
    2512              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
    2513              :                 cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
    2514              :                 cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
    2515              :                 cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
    2516              :                 errdetail("There is no suitable operator in operator family \"%s\" for access method \"%s\".",
    2517              :                           get_opfamily_name(opfamily, false), get_am_name(amid)));
    2518         1061 : }
    2519              : 
    2520              : /*
    2521              :  *  makeObjectName()
    2522              :  *
    2523              :  *  Create a name for an implicitly created index, sequence, constraint,
    2524              :  *  extended statistics, etc.
    2525              :  *
    2526              :  *  The parameters are typically: the original table name, the original field
    2527              :  *  name, and a "type" string (such as "seq" or "pkey").    The field name
    2528              :  *  and/or type can be NULL if not relevant.
    2529              :  *
    2530              :  *  The result is a palloc'd string.
    2531              :  *
    2532              :  *  The basic result we want is "name1_name2_label", omitting "_name2" or
    2533              :  *  "_label" when those parameters are NULL.  However, we must generate
    2534              :  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
    2535              :  *  both names if necessary to make a short-enough string.  The label part
    2536              :  *  is never truncated (so it had better be reasonably short).
    2537              :  *
    2538              :  *  The caller is responsible for checking uniqueness of the generated
    2539              :  *  name and retrying as needed; retrying will be done by altering the
    2540              :  *  "label" string (which is why we never truncate that part).
    2541              :  */
    2542              : char *
    2543        59960 : makeObjectName(const char *name1, const char *name2, const char *label)
    2544              : {
    2545              :     char       *name;
    2546        59960 :     int         overhead = 0;   /* chars needed for label and underscores */
    2547              :     int         availchars;     /* chars available for name(s) */
    2548              :     int         name1chars;     /* chars allocated to name1 */
    2549              :     int         name2chars;     /* chars allocated to name2 */
    2550              :     int         ndx;
    2551              : 
    2552        59960 :     name1chars = strlen(name1);
    2553        59960 :     if (name2)
    2554              :     {
    2555        52764 :         name2chars = strlen(name2);
    2556        52764 :         overhead++;             /* allow for separating underscore */
    2557              :     }
    2558              :     else
    2559         7196 :         name2chars = 0;
    2560        59960 :     if (label)
    2561        22590 :         overhead += strlen(label) + 1;
    2562              : 
    2563        59960 :     availchars = NAMEDATALEN - 1 - overhead;
    2564              :     Assert(availchars > 0);      /* else caller chose a bad label */
    2565              : 
    2566              :     /*
    2567              :      * If we must truncate, preferentially truncate the longer name. This
    2568              :      * logic could be expressed without a loop, but it's simple and obvious as
    2569              :      * a loop.
    2570              :      */
    2571        59993 :     while (name1chars + name2chars > availchars)
    2572              :     {
    2573           33 :         if (name1chars > name2chars)
    2574            0 :             name1chars--;
    2575              :         else
    2576           33 :             name2chars--;
    2577              :     }
    2578              : 
    2579        59960 :     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
    2580        59960 :     if (name2)
    2581        52764 :         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
    2582              : 
    2583              :     /* Now construct the string using the chosen lengths */
    2584        59960 :     name = palloc(name1chars + name2chars + overhead + 1);
    2585        59960 :     memcpy(name, name1, name1chars);
    2586        59960 :     ndx = name1chars;
    2587        59960 :     if (name2)
    2588              :     {
    2589        52764 :         name[ndx++] = '_';
    2590        52764 :         memcpy(name + ndx, name2, name2chars);
    2591        52764 :         ndx += name2chars;
    2592              :     }
    2593        59960 :     if (label)
    2594              :     {
    2595        22590 :         name[ndx++] = '_';
    2596        22590 :         strcpy(name + ndx, label);
    2597              :     }
    2598              :     else
    2599        37370 :         name[ndx] = '\0';
    2600              : 
    2601        59960 :     return name;
    2602              : }
    2603              : 
    2604              : /*
    2605              :  * Select a nonconflicting name for a new relation.  This is ordinarily
    2606              :  * used to choose index names (which is why it's here) but it can also
    2607              :  * be used for sequences, or any autogenerated relation kind.
    2608              :  *
    2609              :  * name1, name2, and label are used the same way as for makeObjectName(),
    2610              :  * except that the label can't be NULL; digits will be appended to the label
    2611              :  * if needed to create a name that is unique within the specified namespace.
    2612              :  *
    2613              :  * If isconstraint is true, we also avoid choosing a name matching any
    2614              :  * existing constraint in the same namespace.  (This is stricter than what
    2615              :  * Postgres itself requires, but the SQL standard says that constraint names
    2616              :  * should be unique within schemas, so we follow that for autogenerated
    2617              :  * constraint names.)
    2618              :  *
    2619              :  * Note: it is theoretically possible to get a collision anyway, if someone
    2620              :  * else chooses the same name concurrently.  We shorten the race condition
    2621              :  * window by checking for conflicting relations using SnapshotDirty, but
    2622              :  * that doesn't close the window entirely.  This is fairly unlikely to be
    2623              :  * a problem in practice, especially if one is holding an exclusive lock on
    2624              :  * the relation identified by name1.  However, if choosing multiple names
    2625              :  * within a single command, you'd better create the new object and do
    2626              :  * CommandCounterIncrement before choosing the next one!
    2627              :  *
    2628              :  * Returns a palloc'd string.
    2629              :  */
    2630              : char *
    2631         7371 : ChooseRelationName(const char *name1, const char *name2,
    2632              :                    const char *label, Oid namespaceid,
    2633              :                    bool isconstraint)
    2634              : {
    2635         7371 :     int         pass = 0;
    2636         7371 :     char       *relname = NULL;
    2637              :     char        modlabel[NAMEDATALEN];
    2638              :     SnapshotData SnapshotDirty;
    2639              :     Relation    pgclassrel;
    2640              : 
    2641              :     /* prepare to search pg_class with a dirty snapshot */
    2642         7371 :     InitDirtySnapshot(SnapshotDirty);
    2643         7371 :     pgclassrel = table_open(RelationRelationId, AccessShareLock);
    2644              : 
    2645              :     /* try the unmodified label first */
    2646         7371 :     strlcpy(modlabel, label, sizeof(modlabel));
    2647              : 
    2648              :     for (;;)
    2649          549 :     {
    2650              :         ScanKeyData key[2];
    2651              :         SysScanDesc scan;
    2652              :         bool        collides;
    2653              : 
    2654         7920 :         relname = makeObjectName(name1, name2, modlabel);
    2655              : 
    2656              :         /* is there any conflicting relation name? */
    2657         7920 :         ScanKeyInit(&key[0],
    2658              :                     Anum_pg_class_relname,
    2659              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2660              :                     CStringGetDatum(relname));
    2661         7920 :         ScanKeyInit(&key[1],
    2662              :                     Anum_pg_class_relnamespace,
    2663              :                     BTEqualStrategyNumber, F_OIDEQ,
    2664              :                     ObjectIdGetDatum(namespaceid));
    2665              : 
    2666         7920 :         scan = systable_beginscan(pgclassrel, ClassNameNspIndexId,
    2667              :                                   true /* indexOK */ ,
    2668              :                                   &SnapshotDirty,
    2669              :                                   2, key);
    2670              : 
    2671         7920 :         collides = HeapTupleIsValid(systable_getnext(scan));
    2672              : 
    2673         7920 :         systable_endscan(scan);
    2674              : 
    2675              :         /* break out of loop if no conflict */
    2676         7920 :         if (!collides)
    2677              :         {
    2678         7374 :             if (!isconstraint ||
    2679         4653 :                 !ConstraintNameExists(relname, namespaceid))
    2680              :                 break;
    2681              :         }
    2682              : 
    2683              :         /* found a conflict, so try a new name component */
    2684          549 :         pfree(relname);
    2685          549 :         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
    2686              :     }
    2687              : 
    2688         7371 :     table_close(pgclassrel, AccessShareLock);
    2689              : 
    2690         7371 :     return relname;
    2691              : }
    2692              : 
    2693              : /*
    2694              :  * Select the name to be used for an index.
    2695              :  *
    2696              :  * The argument list is pretty ad-hoc :-(
    2697              :  */
    2698              : static char *
    2699         6165 : ChooseIndexName(const char *tabname, Oid namespaceId,
    2700              :                 const List *colnames, const List *exclusionOpNames,
    2701              :                 bool primary, bool isconstraint)
    2702              : {
    2703              :     char       *indexname;
    2704              : 
    2705         6165 :     if (primary)
    2706              :     {
    2707              :         /* the primary key's name does not depend on the specific column(s) */
    2708         4164 :         indexname = ChooseRelationName(tabname,
    2709              :                                        NULL,
    2710              :                                        "pkey",
    2711              :                                        namespaceId,
    2712              :                                        true);
    2713              :     }
    2714         2001 :     else if (exclusionOpNames != NIL)
    2715              :     {
    2716          103 :         indexname = ChooseRelationName(tabname,
    2717          103 :                                        ChooseIndexNameAddition(colnames),
    2718              :                                        "excl",
    2719              :                                        namespaceId,
    2720              :                                        true);
    2721              :     }
    2722         1898 :     else if (isconstraint)
    2723              :     {
    2724          383 :         indexname = ChooseRelationName(tabname,
    2725          383 :                                        ChooseIndexNameAddition(colnames),
    2726              :                                        "key",
    2727              :                                        namespaceId,
    2728              :                                        true);
    2729              :     }
    2730              :     else
    2731              :     {
    2732         1515 :         indexname = ChooseRelationName(tabname,
    2733         1515 :                                        ChooseIndexNameAddition(colnames),
    2734              :                                        "idx",
    2735              :                                        namespaceId,
    2736              :                                        false);
    2737              :     }
    2738              : 
    2739         6165 :     return indexname;
    2740              : }
    2741              : 
    2742              : /*
    2743              :  * Generate "name2" for a new index given the list of column names for it
    2744              :  * (as produced by ChooseIndexColumnNames).  This will be passed to
    2745              :  * ChooseRelationName along with the parent table name and a suitable label.
    2746              :  *
    2747              :  * We know that less than NAMEDATALEN characters will actually be used,
    2748              :  * so we can truncate the result once we've generated that many.
    2749              :  *
    2750              :  * XXX See also ChooseForeignKeyConstraintNameAddition and
    2751              :  * ChooseExtendedStatisticNameAddition.
    2752              :  */
    2753              : static char *
    2754         2001 : ChooseIndexNameAddition(const List *colnames)
    2755              : {
    2756              :     char        buf[NAMEDATALEN * 2];
    2757         2001 :     int         buflen = 0;
    2758              :     ListCell   *lc;
    2759              : 
    2760         2001 :     buf[0] = '\0';
    2761         4537 :     foreach(lc, colnames)
    2762              :     {
    2763         2536 :         const char *name = (const char *) lfirst(lc);
    2764              : 
    2765         2536 :         if (buflen > 0)
    2766          535 :             buf[buflen++] = '_';    /* insert _ between names */
    2767              : 
    2768              :         /*
    2769              :          * At this point we have buflen <= NAMEDATALEN.  name should be less
    2770              :          * than NAMEDATALEN already, but use strlcpy for paranoia.
    2771              :          */
    2772         2536 :         strlcpy(buf + buflen, name, NAMEDATALEN);
    2773         2536 :         buflen += strlen(buf + buflen);
    2774         2536 :         if (buflen >= NAMEDATALEN)
    2775            0 :             break;
    2776              :     }
    2777         2001 :     return pstrdup(buf);
    2778              : }
    2779              : 
    2780              : /*
    2781              :  * Select the actual names to be used for the columns of an index, given the
    2782              :  * list of IndexElems for the columns.  This is mostly about ensuring the
    2783              :  * names are unique so we don't get a conflicting-attribute-names error.
    2784              :  *
    2785              :  * Returns a List of plain strings (char *, not String nodes).
    2786              :  */
    2787              : static List *
    2788        15850 : ChooseIndexColumnNames(const List *indexElems)
    2789              : {
    2790        15850 :     List       *result = NIL;
    2791              :     ListCell   *lc;
    2792              : 
    2793        38090 :     foreach(lc, indexElems)
    2794              :     {
    2795        22240 :         IndexElem  *ielem = (IndexElem *) lfirst(lc);
    2796              :         const char *origname;
    2797              :         const char *curname;
    2798              :         int         i;
    2799              :         char        buf[NAMEDATALEN];
    2800              : 
    2801              :         /* Get the preliminary name from the IndexElem */
    2802        22240 :         if (ielem->indexcolname)
    2803         2192 :             origname = ielem->indexcolname; /* caller-specified name */
    2804        20048 :         else if (ielem->name)
    2805        19843 :             origname = ielem->name; /* simple column reference */
    2806              :         else
    2807          205 :             origname = "expr";    /* default name for expression */
    2808              : 
    2809              :         /* If it conflicts with any previous column, tweak it */
    2810        22240 :         curname = origname;
    2811        22240 :         for (i = 1;; i++)
    2812           31 :         {
    2813              :             ListCell   *lc2;
    2814              :             char        nbuf[32];
    2815              :             int         nlen;
    2816              : 
    2817        34777 :             foreach(lc2, result)
    2818              :             {
    2819        12537 :                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
    2820           31 :                     break;
    2821              :             }
    2822        22271 :             if (lc2 == NULL)
    2823        22240 :                 break;          /* found nonconflicting name */
    2824              : 
    2825           31 :             sprintf(nbuf, "%d", i);
    2826              : 
    2827              :             /* Ensure generated names are shorter than NAMEDATALEN */
    2828           31 :             nlen = pg_mbcliplen(origname, strlen(origname),
    2829           31 :                                 NAMEDATALEN - 1 - strlen(nbuf));
    2830           31 :             memcpy(buf, origname, nlen);
    2831           31 :             strcpy(buf + nlen, nbuf);
    2832           31 :             curname = buf;
    2833              :         }
    2834              : 
    2835              :         /* And attach to the result list */
    2836        22240 :         result = lappend(result, pstrdup(curname));
    2837              :     }
    2838        15850 :     return result;
    2839              : }
    2840              : 
    2841              : /*
    2842              :  * ExecReindex
    2843              :  *
    2844              :  * Primary entry point for manual REINDEX commands.  This is mainly a
    2845              :  * preparation wrapper for the real operations that will happen in
    2846              :  * each subroutine of REINDEX.
    2847              :  */
    2848              : void
    2849          566 : ExecReindex(ParseState *pstate, const ReindexStmt *stmt, bool isTopLevel)
    2850              : {
    2851          566 :     ReindexParams params = {0};
    2852              :     ListCell   *lc;
    2853          566 :     bool        concurrently = false;
    2854          566 :     bool        verbose = false;
    2855          566 :     char       *tablespacename = NULL;
    2856              : 
    2857              :     /* Parse option list */
    2858          954 :     foreach(lc, stmt->params)
    2859              :     {
    2860          388 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2861              : 
    2862          388 :         if (strcmp(opt->defname, "verbose") == 0)
    2863            7 :             verbose = defGetBoolean(opt);
    2864          381 :         else if (strcmp(opt->defname, "concurrently") == 0)
    2865          317 :             concurrently = defGetBoolean(opt);
    2866           64 :         else if (strcmp(opt->defname, "tablespace") == 0)
    2867           64 :             tablespacename = defGetString(opt);
    2868              :         else
    2869            0 :             ereport(ERROR,
    2870              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2871              :                      errmsg("unrecognized %s option \"%s\"",
    2872              :                             "REINDEX", opt->defname),
    2873              :                      parser_errposition(pstate, opt->location)));
    2874              :     }
    2875              : 
    2876          566 :     if (concurrently)
    2877          317 :         PreventInTransactionBlock(isTopLevel,
    2878              :                                   "REINDEX CONCURRENTLY");
    2879              : 
    2880          553 :     params.options =
    2881         1106 :         (verbose ? REINDEXOPT_VERBOSE : 0) |
    2882          553 :         (concurrently ? REINDEXOPT_CONCURRENTLY : 0);
    2883              : 
    2884              :     /*
    2885              :      * Assign the tablespace OID to move indexes to, with InvalidOid to do
    2886              :      * nothing.
    2887              :      */
    2888          553 :     if (tablespacename != NULL)
    2889              :     {
    2890           64 :         params.tablespaceOid = get_tablespace_oid(tablespacename, false);
    2891              : 
    2892              :         /* Check permissions except when moving to database's default */
    2893           64 :         if (OidIsValid(params.tablespaceOid) &&
    2894           64 :             params.tablespaceOid != MyDatabaseTableSpace)
    2895              :         {
    2896              :             AclResult   aclresult;
    2897              : 
    2898           64 :             aclresult = object_aclcheck(TableSpaceRelationId, params.tablespaceOid,
    2899              :                                         GetUserId(), ACL_CREATE);
    2900           64 :             if (aclresult != ACLCHECK_OK)
    2901            6 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2902            6 :                                get_tablespace_name(params.tablespaceOid));
    2903              :         }
    2904              :     }
    2905              :     else
    2906          489 :         params.tablespaceOid = InvalidOid;
    2907              : 
    2908          547 :     switch (stmt->kind)
    2909              :     {
    2910          206 :         case REINDEX_OBJECT_INDEX:
    2911          206 :             ReindexIndex(stmt, &params, isTopLevel);
    2912          153 :             break;
    2913          251 :         case REINDEX_OBJECT_TABLE:
    2914          251 :             ReindexTable(stmt, &params, isTopLevel);
    2915          190 :             break;
    2916           90 :         case REINDEX_OBJECT_SCHEMA:
    2917              :         case REINDEX_OBJECT_SYSTEM:
    2918              :         case REINDEX_OBJECT_DATABASE:
    2919              : 
    2920              :             /*
    2921              :              * This cannot run inside a user transaction block; if we were
    2922              :              * inside a transaction, then its commit- and
    2923              :              * start-transaction-command calls would not have the intended
    2924              :              * effect!
    2925              :              */
    2926           90 :             PreventInTransactionBlock(isTopLevel,
    2927          123 :                                       (stmt->kind == REINDEX_OBJECT_SCHEMA) ? "REINDEX SCHEMA" :
    2928           33 :                                       (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
    2929              :                                       "REINDEX DATABASE");
    2930           87 :             ReindexMultipleTables(stmt, &params);
    2931           62 :             break;
    2932            0 :         default:
    2933            0 :             elog(ERROR, "unrecognized object type: %d",
    2934              :                  (int) stmt->kind);
    2935              :             break;
    2936              :     }
    2937          405 : }
    2938              : 
    2939              : /*
    2940              :  * ReindexIndex
    2941              :  *      Recreate a specific index.
    2942              :  */
    2943              : static void
    2944          206 : ReindexIndex(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    2945              : {
    2946          206 :     const RangeVar *indexRelation = stmt->relation;
    2947              :     struct ReindexIndexCallbackState state;
    2948              :     Oid         indOid;
    2949              :     char        persistence;
    2950              :     char        relkind;
    2951              : 
    2952              :     /*
    2953              :      * Find and lock index, and check permissions on table; use callback to
    2954              :      * obtain lock on table first, to avoid deadlock hazard.  The lock level
    2955              :      * used here must match the index lock obtained in reindex_index().
    2956              :      *
    2957              :      * If it's a temporary index, we will perform a non-concurrent reindex,
    2958              :      * even if CONCURRENTLY was requested.  In that case, reindex_index() will
    2959              :      * upgrade the lock, but that's OK, because other sessions can't hold
    2960              :      * locks on our temporary table.
    2961              :      */
    2962          206 :     state.params = *params;
    2963          206 :     state.locked_table_oid = InvalidOid;
    2964          206 :     indOid = RangeVarGetRelidExtended(indexRelation,
    2965          206 :                                       (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    2966              :                                       ShareUpdateExclusiveLock : AccessExclusiveLock,
    2967              :                                       0,
    2968              :                                       RangeVarCallbackForReindexIndex,
    2969              :                                       &state);
    2970              : 
    2971              :     /*
    2972              :      * Obtain the current persistence and kind of the existing index.  We
    2973              :      * already hold a lock on the index.
    2974              :      */
    2975          182 :     persistence = get_rel_persistence(indOid);
    2976          182 :     relkind = get_rel_relkind(indOid);
    2977              : 
    2978          182 :     if (relkind == RELKIND_PARTITIONED_INDEX)
    2979           18 :         ReindexPartitions(stmt, indOid, params, isTopLevel);
    2980          164 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    2981              :              persistence != RELPERSISTENCE_TEMP)
    2982           99 :         ReindexRelationConcurrently(stmt, indOid, params);
    2983              :     else
    2984              :     {
    2985           65 :         ReindexParams newparams = *params;
    2986              : 
    2987           65 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    2988           65 :         reindex_index(stmt, indOid, false, persistence, &newparams);
    2989              :     }
    2990          153 : }
    2991              : 
    2992              : /*
    2993              :  * Check permissions on table before acquiring relation lock; also lock
    2994              :  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
    2995              :  * deadlocks.
    2996              :  */
    2997              : static void
    2998          211 : RangeVarCallbackForReindexIndex(const RangeVar *relation,
    2999              :                                 Oid relId, Oid oldRelId, void *arg)
    3000              : {
    3001              :     char        relkind;
    3002          211 :     struct ReindexIndexCallbackState *state = arg;
    3003              :     LOCKMODE    table_lockmode;
    3004              :     Oid         table_oid;
    3005              :     AclResult   aclresult;
    3006              : 
    3007              :     /*
    3008              :      * Lock level here should match table lock in reindex_index() for
    3009              :      * non-concurrent case and table locks used by index_concurrently_*() for
    3010              :      * concurrent case.
    3011              :      */
    3012          422 :     table_lockmode = (state->params.options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3013          211 :         ShareUpdateExclusiveLock : ShareLock;
    3014              : 
    3015              :     /*
    3016              :      * If we previously locked some other index's heap, and the name we're
    3017              :      * looking up no longer refers to that relation, release the now-useless
    3018              :      * lock.
    3019              :      */
    3020          211 :     if (relId != oldRelId && OidIsValid(oldRelId))
    3021              :     {
    3022            3 :         UnlockRelationOid(state->locked_table_oid, table_lockmode);
    3023            3 :         state->locked_table_oid = InvalidOid;
    3024              :     }
    3025              : 
    3026              :     /* If the relation does not exist, there's nothing more to do. */
    3027          211 :     if (!OidIsValid(relId))
    3028            6 :         return;
    3029              : 
    3030              :     /* If the relation does exist, check whether it's an index. */
    3031          205 :     relkind = get_rel_relkind(relId);
    3032          205 :     if (relkind != RELKIND_INDEX &&
    3033              :         relkind != RELKIND_PARTITIONED_INDEX)
    3034           12 :         ereport(ERROR,
    3035              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3036              :                  errmsg("\"%s\" is not an index", relation->relname)));
    3037              : 
    3038              :     /* Look up the index's table. */
    3039          193 :     table_oid = IndexGetRelation(relId, false);
    3040              : 
    3041              :     /*
    3042              :      * In the unlikely event that, upon retry, we get the same index OID with
    3043              :      * a different table OID, fail.  RangeVarGetRelidExtended() will have
    3044              :      * already locked the index in this case, and it won't retry again, so we
    3045              :      * can't lock the newly discovered table OID without risking deadlock.
    3046              :      * Also, while this corner case is indeed possible, it is extremely
    3047              :      * unlikely to happen in practice, so it's probably not worth any more
    3048              :      * effort than this.
    3049              :      */
    3050          193 :     if (relId == oldRelId && table_oid != state->locked_table_oid)
    3051            0 :         ereport(ERROR,
    3052              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    3053              :                  errmsg("index \"%s\" was concurrently dropped",
    3054              :                         relation->relname)));
    3055              : 
    3056              :     /* Check permissions. */
    3057          193 :     aclresult = pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN);
    3058          193 :     if (aclresult != ACLCHECK_OK)
    3059            6 :         aclcheck_error(aclresult, OBJECT_INDEX, relation->relname);
    3060              : 
    3061              :     /* Lock heap before index to avoid deadlock. */
    3062          187 :     if (relId != oldRelId)
    3063              :     {
    3064          185 :         LockRelationOid(table_oid, table_lockmode);
    3065          185 :         state->locked_table_oid = table_oid;
    3066              :     }
    3067              : }
    3068              : 
    3069              : /*
    3070              :  * ReindexTable
    3071              :  *      Recreate all indexes of a table (and of its toast table, if any)
    3072              :  */
    3073              : static Oid
    3074          251 : ReindexTable(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLevel)
    3075              : {
    3076              :     Oid         heapOid;
    3077              :     bool        result;
    3078          251 :     const RangeVar *relation = stmt->relation;
    3079              : 
    3080              :     /*
    3081              :      * The lock level used here should match reindex_relation().
    3082              :      *
    3083              :      * If it's a temporary table, we will perform a non-concurrent reindex,
    3084              :      * even if CONCURRENTLY was requested.  In that case, reindex_relation()
    3085              :      * will upgrade the lock, but that's OK, because other sessions can't hold
    3086              :      * locks on our temporary table.
    3087              :      */
    3088          251 :     heapOid = RangeVarGetRelidExtended(relation,
    3089          251 :                                        (params->options & REINDEXOPT_CONCURRENTLY) != 0 ?
    3090              :                                        ShareUpdateExclusiveLock : ShareLock,
    3091              :                                        0,
    3092              :                                        RangeVarCallbackMaintainsTable, NULL);
    3093              : 
    3094          228 :     if (get_rel_relkind(heapOid) == RELKIND_PARTITIONED_TABLE)
    3095           36 :         ReindexPartitions(stmt, heapOid, params, isTopLevel);
    3096          312 :     else if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3097          120 :              get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
    3098              :     {
    3099          114 :         result = ReindexRelationConcurrently(stmt, heapOid, params);
    3100              : 
    3101           95 :         if (!result)
    3102            9 :             ereport(NOTICE,
    3103              :                     (errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
    3104              :                             relation->relname)));
    3105              :     }
    3106              :     else
    3107              :     {
    3108           78 :         ReindexParams newparams = *params;
    3109              : 
    3110           78 :         newparams.options |= REINDEXOPT_REPORT_PROGRESS;
    3111           78 :         result = reindex_relation(stmt, heapOid,
    3112              :                                   REINDEX_REL_PROCESS_TOAST |
    3113              :                                   REINDEX_REL_CHECK_CONSTRAINTS,
    3114              :                                   &newparams);
    3115           62 :         if (!result)
    3116            6 :             ereport(NOTICE,
    3117              :                     (errmsg("table \"%s\" has no indexes to reindex",
    3118              :                             relation->relname)));
    3119              :     }
    3120              : 
    3121          190 :     return heapOid;
    3122              : }
    3123              : 
    3124              : /*
    3125              :  * ReindexMultipleTables
    3126              :  *      Recreate indexes of tables selected by objectName/objectKind.
    3127              :  *
    3128              :  * To reduce the probability of deadlocks, each table is reindexed in a
    3129              :  * separate transaction, so we can release the lock on it right away.
    3130              :  * That means this must not be called within a user transaction block!
    3131              :  */
    3132              : static void
    3133           87 : ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params)
    3134              : {
    3135              : 
    3136              :     Oid         objectOid;
    3137              :     Relation    relationRelation;
    3138              :     TableScanDesc scan;
    3139              :     ScanKeyData scan_keys[1];
    3140              :     HeapTuple   tuple;
    3141              :     MemoryContext private_context;
    3142              :     MemoryContext old;
    3143           87 :     List       *relids = NIL;
    3144              :     int         num_keys;
    3145           87 :     bool        concurrent_warning = false;
    3146           87 :     bool        tablespace_warning = false;
    3147           87 :     const char *objectName = stmt->name;
    3148           87 :     const ReindexObjectType objectKind = stmt->kind;
    3149              : 
    3150              :     Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
    3151              :            objectKind == REINDEX_OBJECT_SYSTEM ||
    3152              :            objectKind == REINDEX_OBJECT_DATABASE);
    3153              : 
    3154              :     /*
    3155              :      * This matches the options enforced by the grammar, where the object name
    3156              :      * is optional for DATABASE and SYSTEM.
    3157              :      */
    3158              :     Assert(objectName || objectKind != REINDEX_OBJECT_SCHEMA);
    3159              : 
    3160           87 :     if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3161           17 :         (params->options & REINDEXOPT_CONCURRENTLY) != 0)
    3162           10 :         ereport(ERROR,
    3163              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3164              :                  errmsg("cannot reindex system catalogs concurrently")));
    3165              : 
    3166              :     /*
    3167              :      * Get OID of object to reindex, being the database currently being used
    3168              :      * by session for a database or for system catalogs, or the schema defined
    3169              :      * by caller. At the same time do permission checks that need different
    3170              :      * processing depending on the object type.
    3171              :      */
    3172           77 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3173              :     {
    3174           54 :         objectOid = get_namespace_oid(objectName, false);
    3175              : 
    3176           51 :         if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId()) &&
    3177           12 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3178            9 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
    3179              :                            objectName);
    3180              :     }
    3181              :     else
    3182              :     {
    3183           23 :         objectOid = MyDatabaseId;
    3184              : 
    3185           23 :         if (objectName && strcmp(objectName, get_database_name(objectOid)) != 0)
    3186            3 :             ereport(ERROR,
    3187              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3188              :                      errmsg("can only reindex the currently open database")));
    3189           20 :         if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId()) &&
    3190            0 :             !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN))
    3191            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    3192            0 :                            get_database_name(objectOid));
    3193              :     }
    3194              : 
    3195              :     /*
    3196              :      * Create a memory context that will survive forced transaction commits we
    3197              :      * do below.  Since it is a child of PortalContext, it will go away
    3198              :      * eventually even if we suffer an error; there's no need for special
    3199              :      * abort cleanup logic.
    3200              :      */
    3201           62 :     private_context = AllocSetContextCreate(PortalContext,
    3202              :                                             "ReindexMultipleTables",
    3203              :                                             ALLOCSET_SMALL_SIZES);
    3204              : 
    3205              :     /*
    3206              :      * Define the search keys to find the objects to reindex. For a schema, we
    3207              :      * select target relations using relnamespace, something not necessary for
    3208              :      * a database-wide operation.
    3209              :      */
    3210           62 :     if (objectKind == REINDEX_OBJECT_SCHEMA)
    3211              :     {
    3212           42 :         num_keys = 1;
    3213           42 :         ScanKeyInit(&scan_keys[0],
    3214              :                     Anum_pg_class_relnamespace,
    3215              :                     BTEqualStrategyNumber, F_OIDEQ,
    3216              :                     ObjectIdGetDatum(objectOid));
    3217              :     }
    3218              :     else
    3219           20 :         num_keys = 0;
    3220              : 
    3221              :     /*
    3222              :      * Scan pg_class to build a list of the relations we need to reindex.
    3223              :      *
    3224              :      * We only consider plain relations and materialized views here (toast
    3225              :      * rels will be processed indirectly by reindex_relation).
    3226              :      */
    3227           62 :     relationRelation = table_open(RelationRelationId, AccessShareLock);
    3228           62 :     scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
    3229         9726 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3230              :     {
    3231         9664 :         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
    3232         9664 :         Oid         relid = classtuple->oid;
    3233              : 
    3234              :         /*
    3235              :          * Only regular tables and matviews can have indexes, so ignore any
    3236              :          * other kind of relation.
    3237              :          *
    3238              :          * Partitioned tables/indexes are skipped but matching leaf partitions
    3239              :          * are processed.
    3240              :          */
    3241         9664 :         if (classtuple->relkind != RELKIND_RELATION &&
    3242         7954 :             classtuple->relkind != RELKIND_MATVIEW)
    3243         7945 :             continue;
    3244              : 
    3245              :         /* Skip temp tables of other backends; we can't reindex them at all */
    3246         1719 :         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
    3247           18 :             !isTempNamespace(classtuple->relnamespace))
    3248            0 :             continue;
    3249              : 
    3250              :         /*
    3251              :          * Check user/system classification.  SYSTEM processes all the
    3252              :          * catalogs, and DATABASE processes everything that's not a catalog.
    3253              :          */
    3254         1719 :         if (objectKind == REINDEX_OBJECT_SYSTEM &&
    3255          492 :             !IsCatalogRelationOid(relid))
    3256           44 :             continue;
    3257         2566 :         else if (objectKind == REINDEX_OBJECT_DATABASE &&
    3258          891 :                  IsCatalogRelationOid(relid))
    3259          832 :             continue;
    3260              : 
    3261              :         /*
    3262              :          * We already checked privileges on the database or schema, but we
    3263              :          * further restrict reindexing shared catalogs to roles with the
    3264              :          * MAINTAIN privilege on the relation.
    3265              :          */
    3266          964 :         if (classtuple->relisshared &&
    3267          121 :             pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
    3268            0 :             continue;
    3269              : 
    3270              :         /*
    3271              :          * Skip system tables, since index_create() would reject indexing them
    3272              :          * concurrently (and it would likely fail if we tried).
    3273              :          */
    3274         1084 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3275          241 :             IsCatalogRelationOid(relid))
    3276              :         {
    3277          192 :             if (!concurrent_warning)
    3278            3 :                 ereport(WARNING,
    3279              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3280              :                          errmsg("cannot reindex system catalogs concurrently, skipping all")));
    3281          192 :             concurrent_warning = true;
    3282          192 :             continue;
    3283              :         }
    3284              : 
    3285              :         /*
    3286              :          * If a new tablespace is set, check if this relation has to be
    3287              :          * skipped.
    3288              :          */
    3289          651 :         if (OidIsValid(params->tablespaceOid))
    3290              :         {
    3291            0 :             bool        skip_rel = false;
    3292              : 
    3293              :             /*
    3294              :              * Mapped relations cannot be moved to different tablespaces (in
    3295              :              * particular this eliminates all shared catalogs.).
    3296              :              */
    3297            0 :             if (RELKIND_HAS_STORAGE(classtuple->relkind) &&
    3298            0 :                 !RelFileNumberIsValid(classtuple->relfilenode))
    3299            0 :                 skip_rel = true;
    3300              : 
    3301              :             /*
    3302              :              * A system relation is always skipped, even with
    3303              :              * allow_system_table_mods enabled.
    3304              :              */
    3305            0 :             if (IsSystemClass(relid, classtuple))
    3306            0 :                 skip_rel = true;
    3307              : 
    3308            0 :             if (skip_rel)
    3309              :             {
    3310            0 :                 if (!tablespace_warning)
    3311            0 :                     ereport(WARNING,
    3312              :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    3313              :                              errmsg("cannot move system relations, skipping all")));
    3314            0 :                 tablespace_warning = true;
    3315            0 :                 continue;
    3316              :             }
    3317              :         }
    3318              : 
    3319              :         /* Save the list of relation OIDs in private context */
    3320          651 :         old = MemoryContextSwitchTo(private_context);
    3321              : 
    3322              :         /*
    3323              :          * We always want to reindex pg_class first if it's selected to be
    3324              :          * reindexed.  This ensures that if there is any corruption in
    3325              :          * pg_class' indexes, they will be fixed before we process any other
    3326              :          * tables.  This is critical because reindexing itself will try to
    3327              :          * update pg_class.
    3328              :          */
    3329          651 :         if (relid == RelationRelationId)
    3330            8 :             relids = lcons_oid(relid, relids);
    3331              :         else
    3332          643 :             relids = lappend_oid(relids, relid);
    3333              : 
    3334          651 :         MemoryContextSwitchTo(old);
    3335              :     }
    3336           62 :     table_endscan(scan);
    3337           62 :     table_close(relationRelation, AccessShareLock);
    3338              : 
    3339              :     /*
    3340              :      * Process each relation listed in a separate transaction.  Note that this
    3341              :      * commits and then starts a new transaction immediately.
    3342              :      */
    3343           62 :     ReindexMultipleInternal(stmt, relids, params);
    3344              : 
    3345           62 :     MemoryContextDelete(private_context);
    3346           62 : }
    3347              : 
    3348              : /*
    3349              :  * Error callback specific to ReindexPartitions().
    3350              :  */
    3351              : static void
    3352            6 : reindex_error_callback(void *arg)
    3353              : {
    3354            6 :     ReindexErrorInfo *errinfo = (ReindexErrorInfo *) arg;
    3355              : 
    3356              :     Assert(RELKIND_HAS_PARTITIONS(errinfo->relkind));
    3357              : 
    3358            6 :     if (errinfo->relkind == RELKIND_PARTITIONED_TABLE)
    3359            3 :         errcontext("while reindexing partitioned table \"%s.%s\"",
    3360              :                    errinfo->relnamespace, errinfo->relname);
    3361            3 :     else if (errinfo->relkind == RELKIND_PARTITIONED_INDEX)
    3362            3 :         errcontext("while reindexing partitioned index \"%s.%s\"",
    3363              :                    errinfo->relnamespace, errinfo->relname);
    3364            6 : }
    3365              : 
    3366              : /*
    3367              :  * ReindexPartitions
    3368              :  *
    3369              :  * Reindex a set of partitions, per the partitioned index or table given
    3370              :  * by the caller.
    3371              :  */
    3372              : static void
    3373           54 : ReindexPartitions(const ReindexStmt *stmt, Oid relid, const ReindexParams *params, bool isTopLevel)
    3374              : {
    3375           54 :     List       *partitions = NIL;
    3376           54 :     char        relkind = get_rel_relkind(relid);
    3377           54 :     char       *relname = get_rel_name(relid);
    3378           54 :     char       *relnamespace = get_namespace_name(get_rel_namespace(relid));
    3379              :     MemoryContext reindex_context;
    3380              :     List       *inhoids;
    3381              :     ListCell   *lc;
    3382              :     ErrorContextCallback errcallback;
    3383              :     ReindexErrorInfo errinfo;
    3384              : 
    3385              :     Assert(RELKIND_HAS_PARTITIONS(relkind));
    3386              : 
    3387              :     /*
    3388              :      * Check if this runs in a transaction block, with an error callback to
    3389              :      * provide more context under which a problem happens.
    3390              :      */
    3391           54 :     errinfo.relname = pstrdup(relname);
    3392           54 :     errinfo.relnamespace = pstrdup(relnamespace);
    3393           54 :     errinfo.relkind = relkind;
    3394           54 :     errcallback.callback = reindex_error_callback;
    3395           54 :     errcallback.arg = &errinfo;
    3396           54 :     errcallback.previous = error_context_stack;
    3397           54 :     error_context_stack = &errcallback;
    3398              : 
    3399           54 :     PreventInTransactionBlock(isTopLevel,
    3400              :                               relkind == RELKIND_PARTITIONED_TABLE ?
    3401              :                               "REINDEX TABLE" : "REINDEX INDEX");
    3402              : 
    3403              :     /* Pop the error context stack */
    3404           48 :     error_context_stack = errcallback.previous;
    3405              : 
    3406              :     /*
    3407              :      * Create special memory context for cross-transaction storage.
    3408              :      *
    3409              :      * Since it is a child of PortalContext, it will go away eventually even
    3410              :      * if we suffer an error so there is no need for special abort cleanup
    3411              :      * logic.
    3412              :      */
    3413           48 :     reindex_context = AllocSetContextCreate(PortalContext, "Reindex",
    3414              :                                             ALLOCSET_DEFAULT_SIZES);
    3415              : 
    3416              :     /* ShareLock is enough to prevent schema modifications */
    3417           48 :     inhoids = find_all_inheritors(relid, ShareLock, NULL);
    3418              : 
    3419              :     /*
    3420              :      * The list of relations to reindex are the physical partitions of the
    3421              :      * tree so discard any partitioned table or index.
    3422              :      */
    3423          187 :     foreach(lc, inhoids)
    3424              :     {
    3425          139 :         Oid         partoid = lfirst_oid(lc);
    3426          139 :         char        partkind = get_rel_relkind(partoid);
    3427              :         MemoryContext old_context;
    3428              : 
    3429              :         /*
    3430              :          * This discards partitioned tables, partitioned indexes and foreign
    3431              :          * tables.
    3432              :          */
    3433          139 :         if (!RELKIND_HAS_STORAGE(partkind))
    3434           80 :             continue;
    3435              : 
    3436              :         Assert(partkind == RELKIND_INDEX ||
    3437              :                partkind == RELKIND_RELATION);
    3438              : 
    3439              :         /* Save partition OID */
    3440           59 :         old_context = MemoryContextSwitchTo(reindex_context);
    3441           59 :         partitions = lappend_oid(partitions, partoid);
    3442           59 :         MemoryContextSwitchTo(old_context);
    3443              :     }
    3444              : 
    3445              :     /*
    3446              :      * Process each partition listed in a separate transaction.  Note that
    3447              :      * this commits and then starts a new transaction immediately.
    3448              :      */
    3449           48 :     ReindexMultipleInternal(stmt, partitions, params);
    3450              : 
    3451              :     /*
    3452              :      * Clean up working storage --- note we must do this after
    3453              :      * StartTransactionCommand, else we might be trying to delete the active
    3454              :      * context!
    3455              :      */
    3456           48 :     MemoryContextDelete(reindex_context);
    3457           48 : }
    3458              : 
    3459              : /*
    3460              :  * ReindexMultipleInternal
    3461              :  *
    3462              :  * Reindex a list of relations, each one being processed in its own
    3463              :  * transaction.  This commits the existing transaction immediately,
    3464              :  * and starts a new transaction when finished.
    3465              :  */
    3466              : static void
    3467          110 : ReindexMultipleInternal(const ReindexStmt *stmt, const List *relids, const ReindexParams *params)
    3468              : {
    3469              :     ListCell   *l;
    3470              : 
    3471          110 :     PopActiveSnapshot();
    3472          110 :     CommitTransactionCommand();
    3473              : 
    3474          820 :     foreach(l, relids)
    3475              :     {
    3476          710 :         Oid         relid = lfirst_oid(l);
    3477              :         char        relkind;
    3478              :         char        relpersistence;
    3479              : 
    3480          710 :         StartTransactionCommand();
    3481              : 
    3482              :         /* functions in indexes may want a snapshot set */
    3483          710 :         PushActiveSnapshot(GetTransactionSnapshot());
    3484              : 
    3485              :         /* check if the relation still exists */
    3486          710 :         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
    3487              :         {
    3488            2 :             PopActiveSnapshot();
    3489            2 :             CommitTransactionCommand();
    3490            2 :             continue;
    3491              :         }
    3492              : 
    3493              :         /*
    3494              :          * Check permissions except when moving to database's default if a new
    3495              :          * tablespace is chosen.  Note that this check also happens in
    3496              :          * ExecReindex(), but we do an extra check here as this runs across
    3497              :          * multiple transactions.
    3498              :          */
    3499          708 :         if (OidIsValid(params->tablespaceOid) &&
    3500            6 :             params->tablespaceOid != MyDatabaseTableSpace)
    3501              :         {
    3502              :             AclResult   aclresult;
    3503              : 
    3504            6 :             aclresult = object_aclcheck(TableSpaceRelationId, params->tablespaceOid,
    3505              :                                         GetUserId(), ACL_CREATE);
    3506            6 :             if (aclresult != ACLCHECK_OK)
    3507            0 :                 aclcheck_error(aclresult, OBJECT_TABLESPACE,
    3508            0 :                                get_tablespace_name(params->tablespaceOid));
    3509              :         }
    3510              : 
    3511          708 :         relkind = get_rel_relkind(relid);
    3512          708 :         relpersistence = get_rel_persistence(relid);
    3513              : 
    3514              :         /*
    3515              :          * Partitioned tables and indexes can never be processed directly, and
    3516              :          * a list of their leaves should be built first.
    3517              :          */
    3518              :         Assert(!RELKIND_HAS_PARTITIONS(relkind));
    3519              : 
    3520          708 :         if ((params->options & REINDEXOPT_CONCURRENTLY) != 0 &&
    3521              :             relpersistence != RELPERSISTENCE_TEMP)
    3522           64 :         {
    3523           64 :             ReindexParams newparams = *params;
    3524              : 
    3525           64 :             newparams.options |= REINDEXOPT_MISSING_OK;
    3526           64 :             (void) ReindexRelationConcurrently(stmt, relid, &newparams);
    3527           64 :             if (ActiveSnapshotSet())
    3528           13 :                 PopActiveSnapshot();
    3529              :             /* ReindexRelationConcurrently() does the verbose output */
    3530              :         }
    3531          644 :         else if (relkind == RELKIND_INDEX)
    3532              :         {
    3533            9 :             ReindexParams newparams = *params;
    3534              : 
    3535            9 :             newparams.options |=
    3536              :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3537            9 :             reindex_index(stmt, relid, false, relpersistence, &newparams);
    3538            9 :             PopActiveSnapshot();
    3539              :             /* reindex_index() does the verbose output */
    3540              :         }
    3541              :         else
    3542              :         {
    3543              :             bool        result;
    3544          635 :             ReindexParams newparams = *params;
    3545              : 
    3546          635 :             newparams.options |=
    3547              :                 REINDEXOPT_REPORT_PROGRESS | REINDEXOPT_MISSING_OK;
    3548          635 :             result = reindex_relation(stmt, relid,
    3549              :                                       REINDEX_REL_PROCESS_TOAST |
    3550              :                                       REINDEX_REL_CHECK_CONSTRAINTS,
    3551              :                                       &newparams);
    3552              : 
    3553          635 :             if (result && (params->options & REINDEXOPT_VERBOSE) != 0)
    3554            0 :                 ereport(INFO,
    3555              :                         (errmsg("table \"%s.%s\" was reindexed",
    3556              :                                 get_namespace_name(get_rel_namespace(relid)),
    3557              :                                 get_rel_name(relid))));
    3558              : 
    3559          635 :             PopActiveSnapshot();
    3560              :         }
    3561              : 
    3562          708 :         CommitTransactionCommand();
    3563              :     }
    3564              : 
    3565          110 :     StartTransactionCommand();
    3566          110 : }
    3567              : 
    3568              : 
    3569              : /*
    3570              :  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
    3571              :  * relation OID
    3572              :  *
    3573              :  * 'relationOid' can either belong to an index, a table or a materialized
    3574              :  * view.  For tables and materialized views, all its indexes will be rebuilt,
    3575              :  * excluding invalid indexes and any indexes used in exclusion constraints,
    3576              :  * but including its associated toast table indexes.  For indexes, the index
    3577              :  * itself will be rebuilt.
    3578              :  *
    3579              :  * The locks taken on parent tables and involved indexes are kept until the
    3580              :  * transaction is committed, at which point a session lock is taken on each
    3581              :  * relation.  Both of these protect against concurrent schema changes.
    3582              :  *
    3583              :  * Returns true if any indexes have been rebuilt (including toast table's
    3584              :  * indexes, when relevant), otherwise returns false.
    3585              :  *
    3586              :  * NOTE: This cannot be used on temporary relations.  A concurrent build would
    3587              :  * cause issues with ON COMMIT actions triggered by the transactions of the
    3588              :  * concurrent build.  Temporary relations are not subject to concurrent
    3589              :  * concerns, so there's no need for the more complicated concurrent build,
    3590              :  * anyway, and a non-concurrent reindex is more efficient.
    3591              :  */
    3592              : static bool
    3593          277 : ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const ReindexParams *params)
    3594              : {
    3595              :     typedef struct ReindexIndexInfo
    3596              :     {
    3597              :         Oid         indexId;
    3598              :         Oid         tableId;
    3599              :         Oid         amId;
    3600              :         bool        safe;       /* for set_indexsafe_procflags */
    3601              :     } ReindexIndexInfo;
    3602          277 :     List       *heapRelationIds = NIL;
    3603          277 :     List       *indexIds = NIL;
    3604          277 :     List       *newIndexIds = NIL;
    3605          277 :     List       *relationLocks = NIL;
    3606          277 :     List       *lockTags = NIL;
    3607              :     ListCell   *lc,
    3608              :                *lc2;
    3609              :     MemoryContext private_context;
    3610              :     MemoryContext oldcontext;
    3611              :     char        relkind;
    3612          277 :     char       *relationName = NULL;
    3613          277 :     char       *relationNamespace = NULL;
    3614              :     PGRUsage    ru0;
    3615          277 :     const int   progress_index[] = {
    3616              :         PROGRESS_CREATEIDX_COMMAND,
    3617              :         PROGRESS_CREATEIDX_PHASE,
    3618              :         PROGRESS_CREATEIDX_INDEX_OID,
    3619              :         PROGRESS_CREATEIDX_ACCESS_METHOD_OID
    3620              :     };
    3621              :     int64       progress_vals[4];
    3622              : 
    3623              :     /*
    3624              :      * Create a memory context that will survive forced transaction commits we
    3625              :      * do below.  Since it is a child of PortalContext, it will go away
    3626              :      * eventually even if we suffer an error; there's no need for special
    3627              :      * abort cleanup logic.
    3628              :      */
    3629          277 :     private_context = AllocSetContextCreate(PortalContext,
    3630              :                                             "ReindexConcurrent",
    3631              :                                             ALLOCSET_SMALL_SIZES);
    3632              : 
    3633          277 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    3634              :     {
    3635              :         /* Save data needed by REINDEX VERBOSE in private context */
    3636            2 :         oldcontext = MemoryContextSwitchTo(private_context);
    3637              : 
    3638            2 :         relationName = get_rel_name(relationOid);
    3639            2 :         relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
    3640              : 
    3641            2 :         pg_rusage_init(&ru0);
    3642              : 
    3643            2 :         MemoryContextSwitchTo(oldcontext);
    3644              :     }
    3645              : 
    3646          277 :     relkind = get_rel_relkind(relationOid);
    3647              : 
    3648              :     /*
    3649              :      * Extract the list of indexes that are going to be rebuilt based on the
    3650              :      * relation Oid given by caller.
    3651              :      */
    3652          277 :     switch (relkind)
    3653              :     {
    3654          166 :         case RELKIND_RELATION:
    3655              :         case RELKIND_MATVIEW:
    3656              :         case RELKIND_TOASTVALUE:
    3657              :             {
    3658              :                 /*
    3659              :                  * In the case of a relation, find all its indexes including
    3660              :                  * toast indexes.
    3661              :                  */
    3662              :                 Relation    heapRelation;
    3663              : 
    3664              :                 /* Save the list of relation OIDs in private context */
    3665          166 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3666              : 
    3667              :                 /* Track this relation for session locks */
    3668          166 :                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
    3669              : 
    3670          166 :                 MemoryContextSwitchTo(oldcontext);
    3671              : 
    3672          166 :                 if (IsCatalogRelationOid(relationOid))
    3673           18 :                     ereport(ERROR,
    3674              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3675              :                              errmsg("cannot reindex system catalogs concurrently")));
    3676              : 
    3677              :                 /* Open relation to get its indexes */
    3678          148 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3679              :                 {
    3680           52 :                     heapRelation = try_table_open(relationOid,
    3681              :                                                   ShareUpdateExclusiveLock);
    3682              :                     /* leave if relation does not exist */
    3683           52 :                     if (!heapRelation)
    3684            0 :                         break;
    3685              :                 }
    3686              :                 else
    3687           96 :                     heapRelation = table_open(relationOid,
    3688              :                                               ShareUpdateExclusiveLock);
    3689              : 
    3690          159 :                 if (OidIsValid(params->tablespaceOid) &&
    3691           11 :                     IsSystemRelation(heapRelation))
    3692            1 :                     ereport(ERROR,
    3693              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3694              :                              errmsg("cannot move system relation \"%s\"",
    3695              :                                     RelationGetRelationName(heapRelation))));
    3696              : 
    3697              :                 /* Add all the valid indexes of relation to list */
    3698          284 :                 foreach(lc, RelationGetIndexList(heapRelation))
    3699              :                 {
    3700          137 :                     Oid         cellOid = lfirst_oid(lc);
    3701          137 :                     Relation    indexRelation = index_open(cellOid,
    3702              :                                                            ShareUpdateExclusiveLock);
    3703              : 
    3704          137 :                     if (!indexRelation->rd_index->indisvalid)
    3705            3 :                         ereport(WARNING,
    3706              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3707              :                                  errmsg("skipping reindex of invalid index \"%s.%s\"",
    3708              :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3709              :                                         get_rel_name(cellOid)),
    3710              :                                  errhint("Use DROP INDEX or REINDEX INDEX.")));
    3711          134 :                     else if (indexRelation->rd_index->indisexclusion)
    3712            3 :                         ereport(WARNING,
    3713              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3714              :                                  errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
    3715              :                                         get_namespace_name(get_rel_namespace(cellOid)),
    3716              :                                         get_rel_name(cellOid))));
    3717              :                     else
    3718              :                     {
    3719              :                         ReindexIndexInfo *idx;
    3720              : 
    3721              :                         /* Save the list of relation OIDs in private context */
    3722          131 :                         oldcontext = MemoryContextSwitchTo(private_context);
    3723              : 
    3724          131 :                         idx = palloc_object(ReindexIndexInfo);
    3725          131 :                         idx->indexId = cellOid;
    3726              :                         /* other fields set later */
    3727              : 
    3728          131 :                         indexIds = lappend(indexIds, idx);
    3729              : 
    3730          131 :                         MemoryContextSwitchTo(oldcontext);
    3731              :                     }
    3732              : 
    3733          137 :                     index_close(indexRelation, NoLock);
    3734              :                 }
    3735              : 
    3736              :                 /* Also add the toast indexes */
    3737          147 :                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
    3738              :                 {
    3739           44 :                     Oid         toastOid = heapRelation->rd_rel->reltoastrelid;
    3740           44 :                     Relation    toastRelation = table_open(toastOid,
    3741              :                                                            ShareUpdateExclusiveLock);
    3742              : 
    3743              :                     /* Save the list of relation OIDs in private context */
    3744           44 :                     oldcontext = MemoryContextSwitchTo(private_context);
    3745              : 
    3746              :                     /* Track this relation for session locks */
    3747           44 :                     heapRelationIds = lappend_oid(heapRelationIds, toastOid);
    3748              : 
    3749           44 :                     MemoryContextSwitchTo(oldcontext);
    3750              : 
    3751           88 :                     foreach(lc2, RelationGetIndexList(toastRelation))
    3752              :                     {
    3753           44 :                         Oid         cellOid = lfirst_oid(lc2);
    3754           44 :                         Relation    indexRelation = index_open(cellOid,
    3755              :                                                                ShareUpdateExclusiveLock);
    3756              : 
    3757           44 :                         if (!indexRelation->rd_index->indisvalid)
    3758            0 :                             ereport(WARNING,
    3759              :                                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3760              :                                      errmsg("skipping reindex of invalid index \"%s.%s\"",
    3761              :                                             get_namespace_name(get_rel_namespace(cellOid)),
    3762              :                                             get_rel_name(cellOid)),
    3763              :                                      errhint("Use DROP INDEX or REINDEX INDEX.")));
    3764              :                         else
    3765              :                         {
    3766              :                             ReindexIndexInfo *idx;
    3767              : 
    3768              :                             /*
    3769              :                              * Save the list of relation OIDs in private
    3770              :                              * context
    3771              :                              */
    3772           44 :                             oldcontext = MemoryContextSwitchTo(private_context);
    3773              : 
    3774           44 :                             idx = palloc_object(ReindexIndexInfo);
    3775           44 :                             idx->indexId = cellOid;
    3776           44 :                             indexIds = lappend(indexIds, idx);
    3777              :                             /* other fields set later */
    3778              : 
    3779           44 :                             MemoryContextSwitchTo(oldcontext);
    3780              :                         }
    3781              : 
    3782           44 :                         index_close(indexRelation, NoLock);
    3783              :                     }
    3784              : 
    3785           44 :                     table_close(toastRelation, NoLock);
    3786              :                 }
    3787              : 
    3788          147 :                 table_close(heapRelation, NoLock);
    3789          147 :                 break;
    3790              :             }
    3791          111 :         case RELKIND_INDEX:
    3792              :             {
    3793          111 :                 Oid         heapId = IndexGetRelation(relationOid,
    3794          111 :                                                       (params->options & REINDEXOPT_MISSING_OK) != 0);
    3795              :                 Relation    heapRelation;
    3796              :                 ReindexIndexInfo *idx;
    3797              : 
    3798              :                 /* if relation is missing, leave */
    3799          111 :                 if (!OidIsValid(heapId))
    3800            0 :                     break;
    3801              : 
    3802          111 :                 if (IsCatalogRelationOid(heapId))
    3803            9 :                     ereport(ERROR,
    3804              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3805              :                              errmsg("cannot reindex system catalogs concurrently")));
    3806              : 
    3807              :                 /*
    3808              :                  * Don't allow reindex for an invalid index on TOAST table, as
    3809              :                  * if rebuilt it would not be possible to drop it.  Match
    3810              :                  * error message in reindex_index().
    3811              :                  */
    3812          102 :                 if (IsToastNamespace(get_rel_namespace(relationOid)) &&
    3813           28 :                     !get_index_isvalid(relationOid))
    3814            0 :                     ereport(ERROR,
    3815              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3816              :                              errmsg("cannot reindex invalid index on TOAST table")));
    3817              : 
    3818              :                 /*
    3819              :                  * Check if parent relation can be locked and if it exists,
    3820              :                  * this needs to be done at this stage as the list of indexes
    3821              :                  * to rebuild is not complete yet, and REINDEXOPT_MISSING_OK
    3822              :                  * should not be used once all the session locks are taken.
    3823              :                  */
    3824          102 :                 if ((params->options & REINDEXOPT_MISSING_OK) != 0)
    3825              :                 {
    3826           12 :                     heapRelation = try_table_open(heapId,
    3827              :                                                   ShareUpdateExclusiveLock);
    3828              :                     /* leave if relation does not exist */
    3829           12 :                     if (!heapRelation)
    3830            0 :                         break;
    3831              :                 }
    3832              :                 else
    3833           90 :                     heapRelation = table_open(heapId,
    3834              :                                               ShareUpdateExclusiveLock);
    3835              : 
    3836          106 :                 if (OidIsValid(params->tablespaceOid) &&
    3837            4 :                     IsSystemRelation(heapRelation))
    3838            1 :                     ereport(ERROR,
    3839              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3840              :                              errmsg("cannot move system relation \"%s\"",
    3841              :                                     get_rel_name(relationOid))));
    3842              : 
    3843          101 :                 table_close(heapRelation, NoLock);
    3844              : 
    3845              :                 /* Save the list of relation OIDs in private context */
    3846          101 :                 oldcontext = MemoryContextSwitchTo(private_context);
    3847              : 
    3848              :                 /* Track the heap relation of this index for session locks */
    3849          101 :                 heapRelationIds = list_make1_oid(heapId);
    3850              : 
    3851              :                 /*
    3852              :                  * Save the list of relation OIDs in private context.  Note
    3853              :                  * that invalid indexes are allowed here.
    3854              :                  */
    3855          101 :                 idx = palloc_object(ReindexIndexInfo);
    3856          101 :                 idx->indexId = relationOid;
    3857          101 :                 indexIds = lappend(indexIds, idx);
    3858              :                 /* other fields set later */
    3859              : 
    3860          101 :                 MemoryContextSwitchTo(oldcontext);
    3861          101 :                 break;
    3862              :             }
    3863              : 
    3864            0 :         case RELKIND_PARTITIONED_TABLE:
    3865              :         case RELKIND_PARTITIONED_INDEX:
    3866              :         default:
    3867              :             /* Return error if type of relation is not supported */
    3868            0 :             ereport(ERROR,
    3869              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    3870              :                      errmsg("cannot reindex this type of relation concurrently")));
    3871              :             break;
    3872              :     }
    3873              : 
    3874              :     /*
    3875              :      * Definitely no indexes, so leave.  Any checks based on
    3876              :      * REINDEXOPT_MISSING_OK should be done only while the list of indexes to
    3877              :      * work on is built as the session locks taken before this transaction
    3878              :      * commits will make sure that they cannot be dropped by a concurrent
    3879              :      * session until this operation completes.
    3880              :      */
    3881          248 :     if (indexIds == NIL)
    3882           22 :         return false;
    3883              : 
    3884              :     /* It's not a shared catalog, so refuse to move it to shared tablespace */
    3885          226 :     if (params->tablespaceOid == GLOBALTABLESPACE_OID)
    3886            3 :         ereport(ERROR,
    3887              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3888              :                  errmsg("cannot move non-shared relation to tablespace \"%s\"",
    3889              :                         get_tablespace_name(params->tablespaceOid))));
    3890              : 
    3891              :     Assert(heapRelationIds != NIL);
    3892              : 
    3893              :     /*-----
    3894              :      * Now we have all the indexes we want to process in indexIds.
    3895              :      *
    3896              :      * The phases now are:
    3897              :      *
    3898              :      * 1. create new indexes in the catalog
    3899              :      * 2. build new indexes
    3900              :      * 3. let new indexes catch up with tuples inserted in the meantime
    3901              :      * 4. swap index names
    3902              :      * 5. mark old indexes as dead
    3903              :      * 6. drop old indexes
    3904              :      *
    3905              :      * We process each phase for all indexes before moving to the next phase,
    3906              :      * for efficiency.
    3907              :      */
    3908              : 
    3909              :     /*
    3910              :      * Phase 1 of REINDEX CONCURRENTLY
    3911              :      *
    3912              :      * Create a new index with the same properties as the old one, but it is
    3913              :      * only registered in catalogs and will be built later.  Then get session
    3914              :      * locks on all involved tables.  See analogous code in DefineIndex() for
    3915              :      * more detailed comments.
    3916              :      */
    3917              : 
    3918          493 :     foreach(lc, indexIds)
    3919              :     {
    3920              :         char       *concurrentName;
    3921          273 :         ReindexIndexInfo *idx = lfirst(lc);
    3922              :         ReindexIndexInfo *newidx;
    3923              :         Oid         newIndexId;
    3924              :         Relation    indexRel;
    3925              :         Relation    heapRel;
    3926              :         Oid         save_userid;
    3927              :         int         save_sec_context;
    3928              :         int         save_nestlevel;
    3929              :         Relation    newIndexRel;
    3930              :         LockRelId  *lockrelid;
    3931              :         Oid         tablespaceid;
    3932              : 
    3933          273 :         indexRel = index_open(idx->indexId, ShareUpdateExclusiveLock);
    3934          273 :         heapRel = table_open(indexRel->rd_index->indrelid,
    3935              :                              ShareUpdateExclusiveLock);
    3936              : 
    3937              :         /*
    3938              :          * Switch to the table owner's userid, so that any index functions are
    3939              :          * run as that user.  Also lock down security-restricted operations
    3940              :          * and arrange to make GUC variable changes local to this command.
    3941              :          */
    3942          273 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    3943          273 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
    3944              :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
    3945          273 :         save_nestlevel = NewGUCNestLevel();
    3946          273 :         RestrictSearchPath();
    3947              : 
    3948              :         /* determine safety of this index for set_indexsafe_procflags */
    3949          526 :         idx->safe = (RelationGetIndexExpressions(indexRel) == NIL &&
    3950          253 :                      RelationGetIndexPredicate(indexRel) == NIL);
    3951              : 
    3952              : #ifdef USE_INJECTION_POINTS
    3953          273 :         if (idx->safe)
    3954          249 :             INJECTION_POINT("reindex-conc-index-safe", NULL);
    3955              :         else
    3956           24 :             INJECTION_POINT("reindex-conc-index-not-safe", NULL);
    3957              : #endif
    3958              : 
    3959          273 :         idx->tableId = RelationGetRelid(heapRel);
    3960          273 :         idx->amId = indexRel->rd_rel->relam;
    3961              : 
    3962              :         /* This function shouldn't be called for temporary relations. */
    3963          273 :         if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
    3964            0 :             elog(ERROR, "cannot reindex a temporary table concurrently");
    3965              : 
    3966          273 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, idx->tableId);
    3967              : 
    3968          273 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    3969          273 :         progress_vals[1] = 0;   /* initializing */
    3970          273 :         progress_vals[2] = idx->indexId;
    3971          273 :         progress_vals[3] = idx->amId;
    3972          273 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    3973              : 
    3974              :         /* Choose a temporary relation name for the new index */
    3975          273 :         concurrentName = ChooseRelationName(get_rel_name(idx->indexId),
    3976              :                                             NULL,
    3977              :                                             "ccnew",
    3978          273 :                                             get_rel_namespace(indexRel->rd_index->indrelid),
    3979              :                                             false);
    3980              : 
    3981              :         /* Choose the new tablespace, indexes of toast tables are not moved */
    3982          273 :         if (OidIsValid(params->tablespaceOid) &&
    3983           14 :             heapRel->rd_rel->relkind != RELKIND_TOASTVALUE)
    3984           10 :             tablespaceid = params->tablespaceOid;
    3985              :         else
    3986          263 :             tablespaceid = indexRel->rd_rel->reltablespace;
    3987              : 
    3988              :         /* Create new index definition based on given index */
    3989          273 :         newIndexId = index_concurrently_create_copy(heapRel,
    3990              :                                                     idx->indexId,
    3991              :                                                     tablespaceid,
    3992              :                                                     concurrentName);
    3993              : 
    3994              :         /*
    3995              :          * Now open the relation of the new index, a session-level lock is
    3996              :          * also needed on it.
    3997              :          */
    3998          270 :         newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
    3999              : 
    4000              :         /*
    4001              :          * Save the list of OIDs and locks in private context
    4002              :          */
    4003          270 :         oldcontext = MemoryContextSwitchTo(private_context);
    4004              : 
    4005          270 :         newidx = palloc_object(ReindexIndexInfo);
    4006          270 :         newidx->indexId = newIndexId;
    4007          270 :         newidx->safe = idx->safe;
    4008          270 :         newidx->tableId = idx->tableId;
    4009          270 :         newidx->amId = idx->amId;
    4010              : 
    4011          270 :         newIndexIds = lappend(newIndexIds, newidx);
    4012              : 
    4013              :         /*
    4014              :          * Save lockrelid to protect each relation from drop then close
    4015              :          * relations. The lockrelid on parent relation is not taken here to
    4016              :          * avoid multiple locks taken on the same relation, instead we rely on
    4017              :          * parentRelationIds built earlier.
    4018              :          */
    4019          270 :         lockrelid = palloc_object(LockRelId);
    4020          270 :         *lockrelid = indexRel->rd_lockInfo.lockRelId;
    4021          270 :         relationLocks = lappend(relationLocks, lockrelid);
    4022          270 :         lockrelid = palloc_object(LockRelId);
    4023          270 :         *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
    4024          270 :         relationLocks = lappend(relationLocks, lockrelid);
    4025              : 
    4026          270 :         MemoryContextSwitchTo(oldcontext);
    4027              : 
    4028          270 :         index_close(indexRel, NoLock);
    4029          270 :         index_close(newIndexRel, NoLock);
    4030              : 
    4031              :         /* Roll back any GUC changes executed by index functions */
    4032          270 :         AtEOXact_GUC(false, save_nestlevel);
    4033              : 
    4034              :         /* Restore userid and security context */
    4035          270 :         SetUserIdAndSecContext(save_userid, save_sec_context);
    4036              : 
    4037          270 :         table_close(heapRel, NoLock);
    4038              : 
    4039              :         /*
    4040              :          * If a statement is available, telling that this comes from a REINDEX
    4041              :          * command, collect the new index for event triggers.
    4042              :          */
    4043          270 :         if (stmt)
    4044              :         {
    4045              :             ObjectAddress address;
    4046              : 
    4047          270 :             ObjectAddressSet(address, RelationRelationId, newIndexId);
    4048          270 :             EventTriggerCollectSimpleCommand(address,
    4049              :                                              InvalidObjectAddress,
    4050              :                                              (Node *) stmt);
    4051              :         }
    4052              :     }
    4053              : 
    4054              :     /*
    4055              :      * Save the heap lock for following visibility checks with other backends
    4056              :      * might conflict with this session.
    4057              :      */
    4058          484 :     foreach(lc, heapRelationIds)
    4059              :     {
    4060          264 :         Relation    heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
    4061              :         LockRelId  *lockrelid;
    4062              :         LOCKTAG    *heaplocktag;
    4063              : 
    4064              :         /* Save the list of locks in private context */
    4065          264 :         oldcontext = MemoryContextSwitchTo(private_context);
    4066              : 
    4067              :         /* Add lockrelid of heap relation to the list of locked relations */
    4068          264 :         lockrelid = palloc_object(LockRelId);
    4069          264 :         *lockrelid = heapRelation->rd_lockInfo.lockRelId;
    4070          264 :         relationLocks = lappend(relationLocks, lockrelid);
    4071              : 
    4072          264 :         heaplocktag = palloc_object(LOCKTAG);
    4073              : 
    4074              :         /* Save the LOCKTAG for this parent relation for the wait phase */
    4075          264 :         SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
    4076          264 :         lockTags = lappend(lockTags, heaplocktag);
    4077              : 
    4078          264 :         MemoryContextSwitchTo(oldcontext);
    4079              : 
    4080              :         /* Close heap relation */
    4081          264 :         table_close(heapRelation, NoLock);
    4082              :     }
    4083              : 
    4084              :     /* Get a session-level lock on each table. */
    4085         1024 :     foreach(lc, relationLocks)
    4086              :     {
    4087          804 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4088              : 
    4089          804 :         LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4090              :     }
    4091              : 
    4092          220 :     PopActiveSnapshot();
    4093          220 :     CommitTransactionCommand();
    4094          220 :     StartTransactionCommand();
    4095              : 
    4096              :     /*
    4097              :      * Because we don't take a snapshot in this transaction, there's no need
    4098              :      * to set the PROC_IN_SAFE_IC flag here.
    4099              :      */
    4100              : 
    4101              :     /*
    4102              :      * Phase 2 of REINDEX CONCURRENTLY
    4103              :      *
    4104              :      * Build the new indexes in a separate transaction for each index to avoid
    4105              :      * having open transactions for an unnecessary long time.  But before
    4106              :      * doing that, wait until no running transactions could have the table of
    4107              :      * the index open with the old list of indexes.  See "phase 2" in
    4108              :      * DefineIndex() for more details.
    4109              :      */
    4110              : 
    4111          220 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4112              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_1);
    4113          220 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4114          220 :     CommitTransactionCommand();
    4115              : 
    4116          487 :     foreach(lc, newIndexIds)
    4117              :     {
    4118          270 :         ReindexIndexInfo *newidx = lfirst(lc);
    4119              : 
    4120              :         /* Start new transaction for this index's concurrent build */
    4121          270 :         StartTransactionCommand();
    4122              : 
    4123              :         /*
    4124              :          * Check for user-requested abort.  This is inside a transaction so as
    4125              :          * xact.c does not issue a useless WARNING, and ensures that
    4126              :          * session-level locks are cleaned up on abort.
    4127              :          */
    4128          270 :         CHECK_FOR_INTERRUPTS();
    4129              : 
    4130              :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4131          270 :         if (newidx->safe)
    4132          246 :             set_indexsafe_procflags();
    4133              : 
    4134              :         /* Set ActiveSnapshot since functions in the indexes may need it */
    4135          270 :         PushActiveSnapshot(GetTransactionSnapshot());
    4136              : 
    4137              :         /*
    4138              :          * Update progress for the index to build, with the correct parent
    4139              :          * table involved.
    4140              :          */
    4141          270 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4142          270 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4143          270 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
    4144          270 :         progress_vals[2] = newidx->indexId;
    4145          270 :         progress_vals[3] = newidx->amId;
    4146          270 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4147              : 
    4148              :         /* Perform concurrent build of new index */
    4149          270 :         index_concurrently_build(newidx->tableId, newidx->indexId);
    4150              : 
    4151          267 :         PopActiveSnapshot();
    4152          267 :         CommitTransactionCommand();
    4153              :     }
    4154              : 
    4155          217 :     StartTransactionCommand();
    4156              : 
    4157              :     /*
    4158              :      * Because we don't take a snapshot or Xid in this transaction, there's no
    4159              :      * need to set the PROC_IN_SAFE_IC flag here.
    4160              :      */
    4161              : 
    4162              :     /*
    4163              :      * Phase 3 of REINDEX CONCURRENTLY
    4164              :      *
    4165              :      * During this phase the old indexes catch up with any new tuples that
    4166              :      * were created during the previous phase.  See "phase 3" in DefineIndex()
    4167              :      * for more details.
    4168              :      */
    4169              : 
    4170          217 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4171              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_2);
    4172          217 :     WaitForLockersMultiple(lockTags, ShareLock, true);
    4173          217 :     CommitTransactionCommand();
    4174              : 
    4175          484 :     foreach(lc, newIndexIds)
    4176              :     {
    4177          267 :         ReindexIndexInfo *newidx = lfirst(lc);
    4178              :         TransactionId limitXmin;
    4179              :         Snapshot    snapshot;
    4180              : 
    4181          267 :         StartTransactionCommand();
    4182              : 
    4183              :         /*
    4184              :          * Check for user-requested abort.  This is inside a transaction so as
    4185              :          * xact.c does not issue a useless WARNING, and ensures that
    4186              :          * session-level locks are cleaned up on abort.
    4187              :          */
    4188          267 :         CHECK_FOR_INTERRUPTS();
    4189              : 
    4190              :         /* Tell concurrent indexing to ignore us, if index qualifies */
    4191          267 :         if (newidx->safe)
    4192          243 :             set_indexsafe_procflags();
    4193              : 
    4194              :         /*
    4195              :          * Take the "reference snapshot" that will be used by validate_index()
    4196              :          * to filter candidate tuples.
    4197              :          */
    4198          267 :         snapshot = RegisterSnapshot(GetTransactionSnapshot());
    4199          267 :         PushActiveSnapshot(snapshot);
    4200              : 
    4201              :         /*
    4202              :          * Update progress for the index to build, with the correct parent
    4203              :          * table involved.
    4204              :          */
    4205          267 :         pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
    4206          267 :         progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
    4207          267 :         progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
    4208          267 :         progress_vals[2] = newidx->indexId;
    4209          267 :         progress_vals[3] = newidx->amId;
    4210          267 :         pgstat_progress_update_multi_param(4, progress_index, progress_vals);
    4211              : 
    4212          267 :         validate_index(newidx->tableId, newidx->indexId, snapshot);
    4213              : 
    4214              :         /*
    4215              :          * We can now do away with our active snapshot, we still need to save
    4216              :          * the xmin limit to wait for older snapshots.
    4217              :          */
    4218          267 :         limitXmin = snapshot->xmin;
    4219              : 
    4220          267 :         PopActiveSnapshot();
    4221          267 :         UnregisterSnapshot(snapshot);
    4222              : 
    4223              :         /*
    4224              :          * To ensure no deadlocks, we must commit and start yet another
    4225              :          * transaction, and do our wait before any snapshot has been taken in
    4226              :          * it.
    4227              :          */
    4228          267 :         CommitTransactionCommand();
    4229          267 :         StartTransactionCommand();
    4230              : 
    4231              :         /*
    4232              :          * The index is now valid in the sense that it contains all currently
    4233              :          * interesting tuples.  But since it might not contain tuples deleted
    4234              :          * just before the reference snap was taken, we have to wait out any
    4235              :          * transactions that might have older snapshots.
    4236              :          *
    4237              :          * Because we don't take a snapshot or Xid in this transaction,
    4238              :          * there's no need to set the PROC_IN_SAFE_IC flag here.
    4239              :          */
    4240          267 :         pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4241              :                                      PROGRESS_CREATEIDX_PHASE_WAIT_3);
    4242          267 :         WaitForOlderSnapshots(limitXmin, true);
    4243              : 
    4244          267 :         CommitTransactionCommand();
    4245              :     }
    4246              : 
    4247              :     /*
    4248              :      * Phase 4 of REINDEX CONCURRENTLY
    4249              :      *
    4250              :      * Now that the new indexes have been validated, swap each new index with
    4251              :      * its corresponding old index.
    4252              :      *
    4253              :      * We mark the new indexes as valid and the old indexes as not valid at
    4254              :      * the same time to make sure we only get constraint violations from the
    4255              :      * indexes with the correct names.
    4256              :      */
    4257              : 
    4258          217 :     INJECTION_POINT("reindex-relation-concurrently-before-swap", NULL);
    4259          217 :     StartTransactionCommand();
    4260              : 
    4261              :     /*
    4262              :      * Because this transaction only does catalog manipulations and doesn't do
    4263              :      * any index operations, we can set the PROC_IN_SAFE_IC flag here
    4264              :      * unconditionally.
    4265              :      */
    4266          217 :     set_indexsafe_procflags();
    4267              : 
    4268          484 :     forboth(lc, indexIds, lc2, newIndexIds)
    4269              :     {
    4270          267 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4271          267 :         ReindexIndexInfo *newidx = lfirst(lc2);
    4272              :         char       *oldName;
    4273              : 
    4274              :         /*
    4275              :          * Check for user-requested abort.  This is inside a transaction so as
    4276              :          * xact.c does not issue a useless WARNING, and ensures that
    4277              :          * session-level locks are cleaned up on abort.
    4278              :          */
    4279          267 :         CHECK_FOR_INTERRUPTS();
    4280              : 
    4281              :         /* Choose a relation name for old index */
    4282          267 :         oldName = ChooseRelationName(get_rel_name(oldidx->indexId),
    4283              :                                      NULL,
    4284              :                                      "ccold",
    4285              :                                      get_rel_namespace(oldidx->tableId),
    4286              :                                      false);
    4287              : 
    4288              :         /*
    4289              :          * Swapping the indexes might involve TOAST table access, so ensure we
    4290              :          * have a valid snapshot.
    4291              :          */
    4292          267 :         PushActiveSnapshot(GetTransactionSnapshot());
    4293              : 
    4294              :         /*
    4295              :          * Swap old index with the new one.  This also marks the new one as
    4296              :          * valid and the old one as not valid.
    4297              :          */
    4298          267 :         index_concurrently_swap(newidx->indexId, oldidx->indexId, oldName);
    4299              : 
    4300          267 :         PopActiveSnapshot();
    4301              : 
    4302              :         /*
    4303              :          * Invalidate the relcache for the table, so that after this commit
    4304              :          * all sessions will refresh any cached plans that might reference the
    4305              :          * index.
    4306              :          */
    4307          267 :         CacheInvalidateRelcacheByRelid(oldidx->tableId);
    4308              : 
    4309              :         /*
    4310              :          * CCI here so that subsequent iterations see the oldName in the
    4311              :          * catalog and can choose a nonconflicting name for their oldName.
    4312              :          * Otherwise, this could lead to conflicts if a table has two indexes
    4313              :          * whose names are equal for the first NAMEDATALEN-minus-a-few
    4314              :          * characters.
    4315              :          */
    4316          267 :         CommandCounterIncrement();
    4317              :     }
    4318              : 
    4319              :     /* Commit this transaction and make index swaps visible */
    4320          217 :     CommitTransactionCommand();
    4321          217 :     StartTransactionCommand();
    4322              : 
    4323              :     /*
    4324              :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4325              :      * real need for that, because we only acquire an Xid after the wait is
    4326              :      * done, and that lasts for a very short period.
    4327              :      */
    4328              : 
    4329              :     /*
    4330              :      * Phase 5 of REINDEX CONCURRENTLY
    4331              :      *
    4332              :      * Mark the old indexes as dead.  First we must wait until no running
    4333              :      * transaction could be using the index for a query.  See also
    4334              :      * index_drop() for more details.
    4335              :      */
    4336              : 
    4337          217 :     INJECTION_POINT("reindex-relation-concurrently-before-set-dead", NULL);
    4338          217 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4339              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_4);
    4340          217 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4341              : 
    4342          484 :     foreach(lc, indexIds)
    4343              :     {
    4344          267 :         ReindexIndexInfo *oldidx = lfirst(lc);
    4345              : 
    4346              :         /*
    4347              :          * Check for user-requested abort.  This is inside a transaction so as
    4348              :          * xact.c does not issue a useless WARNING, and ensures that
    4349              :          * session-level locks are cleaned up on abort.
    4350              :          */
    4351          267 :         CHECK_FOR_INTERRUPTS();
    4352              : 
    4353              :         /*
    4354              :          * Updating pg_index might involve TOAST table access, so ensure we
    4355              :          * have a valid snapshot.
    4356              :          */
    4357          267 :         PushActiveSnapshot(GetTransactionSnapshot());
    4358              : 
    4359          267 :         index_concurrently_set_dead(oldidx->tableId, oldidx->indexId);
    4360              : 
    4361          267 :         PopActiveSnapshot();
    4362              :     }
    4363              : 
    4364              :     /* Commit this transaction to make the updates visible. */
    4365          217 :     CommitTransactionCommand();
    4366          217 :     StartTransactionCommand();
    4367              : 
    4368              :     /*
    4369              :      * While we could set PROC_IN_SAFE_IC if all indexes qualified, there's no
    4370              :      * real need for that, because we only acquire an Xid after the wait is
    4371              :      * done, and that lasts for a very short period.
    4372              :      */
    4373              : 
    4374              :     /*
    4375              :      * Phase 6 of REINDEX CONCURRENTLY
    4376              :      *
    4377              :      * Drop the old indexes.
    4378              :      */
    4379              : 
    4380          217 :     pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
    4381              :                                  PROGRESS_CREATEIDX_PHASE_WAIT_5);
    4382          217 :     WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
    4383              : 
    4384          217 :     PushActiveSnapshot(GetTransactionSnapshot());
    4385              : 
    4386              :     {
    4387          217 :         ObjectAddresses *objects = new_object_addresses();
    4388              : 
    4389          484 :         foreach(lc, indexIds)
    4390              :         {
    4391          267 :             ReindexIndexInfo *idx = lfirst(lc);
    4392              :             ObjectAddress object;
    4393              : 
    4394          267 :             object.classId = RelationRelationId;
    4395          267 :             object.objectId = idx->indexId;
    4396          267 :             object.objectSubId = 0;
    4397              : 
    4398          267 :             add_exact_object_address(&object, objects);
    4399              :         }
    4400              : 
    4401              :         /*
    4402              :          * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
    4403              :          * right lock level.
    4404              :          */
    4405          217 :         performMultipleDeletions(objects, DROP_RESTRICT,
    4406              :                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
    4407              :     }
    4408              : 
    4409          217 :     PopActiveSnapshot();
    4410          217 :     CommitTransactionCommand();
    4411              : 
    4412              :     /*
    4413              :      * Finally, release the session-level lock on the table.
    4414              :      */
    4415         1012 :     foreach(lc, relationLocks)
    4416              :     {
    4417          795 :         LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
    4418              : 
    4419          795 :         UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
    4420              :     }
    4421              : 
    4422              :     /* Start a new transaction to finish process properly */
    4423          217 :     StartTransactionCommand();
    4424              : 
    4425              :     /* Log what we did */
    4426          217 :     if ((params->options & REINDEXOPT_VERBOSE) != 0)
    4427              :     {
    4428            2 :         if (relkind == RELKIND_INDEX)
    4429            0 :             ereport(INFO,
    4430              :                     (errmsg("index \"%s.%s\" was reindexed",
    4431              :                             relationNamespace, relationName),
    4432              :                      errdetail("%s.",
    4433              :                                pg_rusage_show(&ru0))));
    4434              :         else
    4435              :         {
    4436            6 :             foreach(lc, newIndexIds)
    4437              :             {
    4438            4 :                 ReindexIndexInfo *idx = lfirst(lc);
    4439            4 :                 Oid         indOid = idx->indexId;
    4440              : 
    4441            4 :                 ereport(INFO,
    4442              :                         (errmsg("index \"%s.%s\" was reindexed",
    4443              :                                 get_namespace_name(get_rel_namespace(indOid)),
    4444              :                                 get_rel_name(indOid))));
    4445              :                 /* Don't show rusage here, since it's not per index. */
    4446              :             }
    4447              : 
    4448            2 :             ereport(INFO,
    4449              :                     (errmsg("table \"%s.%s\" was reindexed",
    4450              :                             relationNamespace, relationName),
    4451              :                      errdetail("%s.",
    4452              :                                pg_rusage_show(&ru0))));
    4453              :         }
    4454              :     }
    4455              : 
    4456          217 :     MemoryContextDelete(private_context);
    4457              : 
    4458          217 :     pgstat_progress_end_command();
    4459              : 
    4460          217 :     return true;
    4461              : }
    4462              : 
    4463              : /*
    4464              :  * Insert or delete an appropriate pg_inherits tuple to make the given index
    4465              :  * be a partition of the indicated parent index.
    4466              :  *
    4467              :  * This also corrects the pg_depend information for the affected index.
    4468              :  */
    4469              : void
    4470          522 : IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
    4471              : {
    4472              :     Relation    pg_inherits;
    4473              :     ScanKeyData key[2];
    4474              :     SysScanDesc scan;
    4475          522 :     Oid         partRelid = RelationGetRelid(partitionIdx);
    4476              :     HeapTuple   tuple;
    4477              :     bool        fix_dependencies;
    4478              : 
    4479              :     /* Make sure this is an index */
    4480              :     Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
    4481              :            partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
    4482              : 
    4483              :     /*
    4484              :      * Scan pg_inherits for rows linking our index to some parent.
    4485              :      */
    4486          522 :     pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
    4487          522 :     ScanKeyInit(&key[0],
    4488              :                 Anum_pg_inherits_inhrelid,
    4489              :                 BTEqualStrategyNumber, F_OIDEQ,
    4490              :                 ObjectIdGetDatum(partRelid));
    4491          522 :     ScanKeyInit(&key[1],
    4492              :                 Anum_pg_inherits_inhseqno,
    4493              :                 BTEqualStrategyNumber, F_INT4EQ,
    4494              :                 Int32GetDatum(1));
    4495          522 :     scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
    4496              :                               NULL, 2, key);
    4497          522 :     tuple = systable_getnext(scan);
    4498              : 
    4499          522 :     if (!HeapTupleIsValid(tuple))
    4500              :     {
    4501          308 :         if (parentOid == InvalidOid)
    4502              :         {
    4503              :             /*
    4504              :              * No pg_inherits row, and no parent wanted: nothing to do in this
    4505              :              * case.
    4506              :              */
    4507            0 :             fix_dependencies = false;
    4508              :         }
    4509              :         else
    4510              :         {
    4511          308 :             StoreSingleInheritance(partRelid, parentOid, 1);
    4512          308 :             fix_dependencies = true;
    4513              :         }
    4514              :     }
    4515              :     else
    4516              :     {
    4517          214 :         Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
    4518              : 
    4519          214 :         if (parentOid == InvalidOid)
    4520              :         {
    4521              :             /*
    4522              :              * There exists a pg_inherits row, which we want to clear; do so.
    4523              :              */
    4524          214 :             CatalogTupleDelete(pg_inherits, &tuple->t_self);
    4525          214 :             fix_dependencies = true;
    4526              :         }
    4527              :         else
    4528              :         {
    4529              :             /*
    4530              :              * A pg_inherits row exists.  If it's the same we want, then we're
    4531              :              * good; if it differs, that amounts to a corrupt catalog and
    4532              :              * should not happen.
    4533              :              */
    4534            0 :             if (inhForm->inhparent != parentOid)
    4535              :             {
    4536              :                 /* unexpected: we should not get called in this case */
    4537            0 :                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
    4538              :                      inhForm->inhrelid, inhForm->inhparent);
    4539              :             }
    4540              : 
    4541              :             /* already in the right state */
    4542            0 :             fix_dependencies = false;
    4543              :         }
    4544              :     }
    4545              : 
    4546              :     /* done with pg_inherits */
    4547          522 :     systable_endscan(scan);
    4548          522 :     relation_close(pg_inherits, RowExclusiveLock);
    4549              : 
    4550              :     /* set relhassubclass if an index partition has been added to the parent */
    4551          522 :     if (OidIsValid(parentOid))
    4552              :     {
    4553          308 :         LockRelationOid(parentOid, ShareUpdateExclusiveLock);
    4554          308 :         SetRelationHasSubclass(parentOid, true);
    4555              :     }
    4556              : 
    4557              :     /* set relispartition correctly on the partition */
    4558          522 :     update_relispartition(partRelid, OidIsValid(parentOid));
    4559              : 
    4560          522 :     if (fix_dependencies)
    4561              :     {
    4562              :         /*
    4563              :          * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
    4564              :          * dependencies on the parent index and the table; if removing a
    4565              :          * parent, delete PARTITION dependencies.
    4566              :          */
    4567          522 :         if (OidIsValid(parentOid))
    4568              :         {
    4569              :             ObjectAddress partIdx;
    4570              :             ObjectAddress parentIdx;
    4571              :             ObjectAddress partitionTbl;
    4572              : 
    4573          308 :             ObjectAddressSet(partIdx, RelationRelationId, partRelid);
    4574          308 :             ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
    4575          308 :             ObjectAddressSet(partitionTbl, RelationRelationId,
    4576              :                              partitionIdx->rd_index->indrelid);
    4577          308 :             recordDependencyOn(&partIdx, &parentIdx,
    4578              :                                DEPENDENCY_PARTITION_PRI);
    4579          308 :             recordDependencyOn(&partIdx, &partitionTbl,
    4580              :                                DEPENDENCY_PARTITION_SEC);
    4581              :         }
    4582              :         else
    4583              :         {
    4584          214 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4585              :                                             RelationRelationId,
    4586              :                                             DEPENDENCY_PARTITION_PRI);
    4587          214 :             deleteDependencyRecordsForClass(RelationRelationId, partRelid,
    4588              :                                             RelationRelationId,
    4589              :                                             DEPENDENCY_PARTITION_SEC);
    4590              :         }
    4591              : 
    4592              :         /* make our updates visible */
    4593          522 :         CommandCounterIncrement();
    4594              :     }
    4595          522 : }
    4596              : 
    4597              : /*
    4598              :  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
    4599              :  * given index to the given value.
    4600              :  */
    4601              : static void
    4602          522 : update_relispartition(Oid relationId, bool newval)
    4603              : {
    4604              :     HeapTuple   tup;
    4605              :     Relation    classRel;
    4606              :     ItemPointerData otid;
    4607              : 
    4608          522 :     classRel = table_open(RelationRelationId, RowExclusiveLock);
    4609          522 :     tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
    4610          522 :     if (!HeapTupleIsValid(tup))
    4611            0 :         elog(ERROR, "cache lookup failed for relation %u", relationId);
    4612          522 :     otid = tup->t_self;
    4613              :     Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    4614          522 :     ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
    4615          522 :     CatalogTupleUpdate(classRel, &otid, tup);
    4616          522 :     UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
    4617          522 :     heap_freetuple(tup);
    4618          522 :     table_close(classRel, RowExclusiveLock);
    4619          522 : }
    4620              : 
    4621              : /*
    4622              :  * Set the PROC_IN_SAFE_IC flag in MyProc->statusFlags.
    4623              :  *
    4624              :  * When doing concurrent index builds, we can set this flag
    4625              :  * to tell other processes concurrently running CREATE
    4626              :  * INDEX CONCURRENTLY or REINDEX CONCURRENTLY to ignore us when
    4627              :  * doing their waits for concurrent snapshots.  On one hand it
    4628              :  * avoids pointlessly waiting for a process that's not interesting
    4629              :  * anyway; but more importantly it avoids deadlocks in some cases.
    4630              :  *
    4631              :  * This can be done safely only for indexes that don't execute any
    4632              :  * expressions that could access other tables, so index must not be
    4633              :  * expressional nor partial.  Caller is responsible for only calling
    4634              :  * this routine when that assumption holds true.
    4635              :  *
    4636              :  * (The flag is reset automatically at transaction end, so it must be
    4637              :  * set for each transaction.)
    4638              :  */
    4639              : static inline void
    4640          901 : set_indexsafe_procflags(void)
    4641              : {
    4642              :     /*
    4643              :      * This should only be called before installing xid or xmin in MyProc;
    4644              :      * otherwise, concurrent processes could see an Xmin that moves backwards.
    4645              :      */
    4646              :     Assert(MyProc->xid == InvalidTransactionId &&
    4647              :            MyProc->xmin == InvalidTransactionId);
    4648              : 
    4649          901 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    4650          901 :     MyProc->statusFlags |= PROC_IN_SAFE_IC;
    4651          901 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
    4652          901 :     LWLockRelease(ProcArrayLock);
    4653          901 : }
        

Generated by: LCOV version 2.0-1