LCOV - code coverage report
Current view: top level - src/backend/commands - dbcommands.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.4 % 1111 938
Test Date: 2026-03-10 00:15:05 Functions: 96.6 % 29 28
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * dbcommands.c
       4              :  *      Database management commands (create/drop database).
       5              :  *
       6              :  * Note: database creation/destruction commands use exclusive locks on
       7              :  * the database objects (as expressed by LockSharedObject()) to avoid
       8              :  * stepping on each others' toes.  Formerly we used table-level locks
       9              :  * on pg_database, but that's too coarse-grained.
      10              :  *
      11              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      12              :  * Portions Copyright (c) 1994, Regents of the University of California
      13              :  *
      14              :  *
      15              :  * IDENTIFICATION
      16              :  *    src/backend/commands/dbcommands.c
      17              :  *
      18              :  *-------------------------------------------------------------------------
      19              :  */
      20              : #include "postgres.h"
      21              : 
      22              : #include <fcntl.h>
      23              : #include <unistd.h>
      24              : #include <sys/stat.h>
      25              : 
      26              : #include "access/genam.h"
      27              : #include "access/heapam.h"
      28              : #include "access/htup_details.h"
      29              : #include "access/multixact.h"
      30              : #include "access/tableam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xloginsert.h"
      33              : #include "access/xlogrecovery.h"
      34              : #include "access/xlogutils.h"
      35              : #include "catalog/catalog.h"
      36              : #include "catalog/dependency.h"
      37              : #include "catalog/indexing.h"
      38              : #include "catalog/objectaccess.h"
      39              : #include "catalog/pg_authid.h"
      40              : #include "catalog/pg_collation.h"
      41              : #include "catalog/pg_database.h"
      42              : #include "catalog/pg_db_role_setting.h"
      43              : #include "catalog/pg_subscription.h"
      44              : #include "catalog/pg_tablespace.h"
      45              : #include "commands/comment.h"
      46              : #include "commands/dbcommands.h"
      47              : #include "commands/dbcommands_xlog.h"
      48              : #include "commands/defrem.h"
      49              : #include "commands/seclabel.h"
      50              : #include "commands/tablespace.h"
      51              : #include "common/file_perm.h"
      52              : #include "mb/pg_wchar.h"
      53              : #include "miscadmin.h"
      54              : #include "pgstat.h"
      55              : #include "postmaster/bgwriter.h"
      56              : #include "replication/slot.h"
      57              : #include "storage/copydir.h"
      58              : #include "storage/fd.h"
      59              : #include "storage/ipc.h"
      60              : #include "storage/lmgr.h"
      61              : #include "storage/md.h"
      62              : #include "storage/procarray.h"
      63              : #include "storage/procsignal.h"
      64              : #include "storage/smgr.h"
      65              : #include "utils/acl.h"
      66              : #include "utils/builtins.h"
      67              : #include "utils/fmgroids.h"
      68              : #include "utils/lsyscache.h"
      69              : #include "utils/pg_locale.h"
      70              : #include "utils/relmapper.h"
      71              : #include "utils/snapmgr.h"
      72              : #include "utils/syscache.h"
      73              : #include "utils/wait_event.h"
      74              : 
      75              : /*
      76              :  * Create database strategy.
      77              :  *
      78              :  * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
      79              :  * copied block.
      80              :  *
      81              :  * CREATEDB_FILE_COPY will simply perform a file system level copy of the
      82              :  * database and log a single record for each tablespace copied. To make this
      83              :  * safe, it also triggers checkpoints before and after the operation.
      84              :  */
      85              : typedef enum CreateDBStrategy
      86              : {
      87              :     CREATEDB_WAL_LOG,
      88              :     CREATEDB_FILE_COPY,
      89              : } CreateDBStrategy;
      90              : 
      91              : typedef struct
      92              : {
      93              :     Oid         src_dboid;      /* source (template) DB */
      94              :     Oid         dest_dboid;     /* DB we are trying to create */
      95              :     CreateDBStrategy strategy;  /* create db strategy */
      96              : } createdb_failure_params;
      97              : 
      98              : typedef struct
      99              : {
     100              :     Oid         dest_dboid;     /* DB we are trying to move */
     101              :     Oid         dest_tsoid;     /* tablespace we are trying to move to */
     102              : } movedb_failure_params;
     103              : 
     104              : /*
     105              :  * Information about a relation to be copied when creating a database.
     106              :  */
     107              : typedef struct CreateDBRelInfo
     108              : {
     109              :     RelFileLocator rlocator;    /* physical relation identifier */
     110              :     Oid         reloid;         /* relation oid */
     111              :     bool        permanent;      /* relation is permanent or unlogged */
     112              : } CreateDBRelInfo;
     113              : 
     114              : 
     115              : /* non-export function prototypes */
     116              : static void createdb_failure_callback(int code, Datum arg);
     117              : static void movedb(const char *dbname, const char *tblspcname);
     118              : static void movedb_failure_callback(int code, Datum arg);
     119              : static bool get_db_info(const char *name, LOCKMODE lockmode,
     120              :                         Oid *dbIdP, Oid *ownerIdP,
     121              :                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
     122              :                         TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
     123              :                         Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
     124              :                         char **dbIcurules,
     125              :                         char *dbLocProvider,
     126              :                         char **dbCollversion);
     127              : static void remove_dbtablespaces(Oid db_id);
     128              : static bool check_db_file_conflict(Oid db_id);
     129              : static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
     130              : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     131              :                                       Oid dst_tsid);
     132              : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
     133              : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
     134              :                                            Oid dbid, char *srcpath,
     135              :                                            List *rlocatorlist, Snapshot snapshot);
     136              : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
     137              :                                                        Oid tbid, Oid dbid,
     138              :                                                        char *srcpath);
     139              : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
     140              :                                     bool isRedo);
     141              : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
     142              :                                         Oid src_tsid, Oid dst_tsid);
     143              : static void recovery_create_dbdir(char *path, bool only_tblspc);
     144              : 
     145              : /*
     146              :  * Create a new database using the WAL_LOG strategy.
     147              :  *
     148              :  * Each copied block is separately written to the write-ahead log.
     149              :  */
     150              : static void
     151          261 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
     152              :                           Oid src_tsid, Oid dst_tsid)
     153              : {
     154              :     char       *srcpath;
     155              :     char       *dstpath;
     156          261 :     List       *rlocatorlist = NULL;
     157              :     ListCell   *cell;
     158              :     LockRelId   srcrelid;
     159              :     LockRelId   dstrelid;
     160              :     RelFileLocator srcrlocator;
     161              :     RelFileLocator dstrlocator;
     162              :     CreateDBRelInfo *relinfo;
     163              : 
     164              :     /* Get source and destination database paths. */
     165          261 :     srcpath = GetDatabasePath(src_dboid, src_tsid);
     166          261 :     dstpath = GetDatabasePath(dst_dboid, dst_tsid);
     167              : 
     168              :     /* Create database directory and write PG_VERSION file. */
     169          261 :     CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
     170              : 
     171              :     /* Copy relmap file from source database to the destination database. */
     172          261 :     RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
     173              : 
     174              :     /* Get list of relfilelocators to copy from the source database. */
     175          261 :     rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
     176              :     Assert(rlocatorlist != NIL);
     177              : 
     178              :     /*
     179              :      * Database IDs will be the same for all relations so set them before
     180              :      * entering the loop.
     181              :      */
     182          261 :     srcrelid.dbId = src_dboid;
     183          261 :     dstrelid.dbId = dst_dboid;
     184              : 
     185              :     /* Loop over our list of relfilelocators and copy each one. */
     186        58303 :     foreach(cell, rlocatorlist)
     187              :     {
     188        58044 :         relinfo = lfirst(cell);
     189        58044 :         srcrlocator = relinfo->rlocator;
     190              : 
     191              :         /*
     192              :          * If the relation is from the source db's default tablespace then we
     193              :          * need to create it in the destination db's default tablespace.
     194              :          * Otherwise, we need to create in the same tablespace as it is in the
     195              :          * source database.
     196              :          */
     197        58044 :         if (srcrlocator.spcOid == src_tsid)
     198        58044 :             dstrlocator.spcOid = dst_tsid;
     199              :         else
     200            0 :             dstrlocator.spcOid = srcrlocator.spcOid;
     201              : 
     202        58044 :         dstrlocator.dbOid = dst_dboid;
     203        58044 :         dstrlocator.relNumber = srcrlocator.relNumber;
     204              : 
     205              :         /*
     206              :          * Acquire locks on source and target relations before copying.
     207              :          *
     208              :          * We typically do not read relation data into shared_buffers without
     209              :          * holding a relation lock. It's unclear what could go wrong if we
     210              :          * skipped it in this case, because nobody can be modifying either the
     211              :          * source or destination database at this point, and we have locks on
     212              :          * both databases, too, but let's take the conservative route.
     213              :          */
     214        58044 :         dstrelid.relId = srcrelid.relId = relinfo->reloid;
     215        58044 :         LockRelationId(&srcrelid, AccessShareLock);
     216        58044 :         LockRelationId(&dstrelid, AccessShareLock);
     217              : 
     218              :         /* Copy relation storage from source to the destination. */
     219        58044 :         CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
     220              : 
     221              :         /* Release the relation locks. */
     222        58042 :         UnlockRelationId(&srcrelid, AccessShareLock);
     223        58042 :         UnlockRelationId(&dstrelid, AccessShareLock);
     224              :     }
     225              : 
     226          259 :     pfree(srcpath);
     227          259 :     pfree(dstpath);
     228          259 :     list_free_deep(rlocatorlist);
     229          259 : }
     230              : 
     231              : /*
     232              :  * Scan the pg_class table in the source database to identify the relations
     233              :  * that need to be copied to the destination database.
     234              :  *
     235              :  * This is an exception to the usual rule that cross-database access is
     236              :  * not possible. We can make it work here because we know that there are no
     237              :  * connections to the source database and (since there can't be prepared
     238              :  * transactions touching that database) no in-doubt tuples either. This
     239              :  * means that we don't need to worry about pruning removing anything from
     240              :  * under us, and we don't need to be too picky about our snapshot either.
     241              :  * As long as it sees all previously-committed XIDs as committed and all
     242              :  * aborted XIDs as aborted, we should be fine: nothing else is possible
     243              :  * here.
     244              :  *
     245              :  * We can't rely on the relcache for anything here, because that only knows
     246              :  * about the database to which we are connected, and can't handle access to
     247              :  * other databases. That also means we can't rely on the heap scan
     248              :  * infrastructure, which would be a bad idea anyway since it might try
     249              :  * to do things like HOT pruning which we definitely can't do safely in
     250              :  * a database to which we're not even connected.
     251              :  */
     252              : static List *
     253          261 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
     254              : {
     255              :     RelFileLocator rlocator;
     256              :     BlockNumber nblocks;
     257              :     BlockNumber blkno;
     258              :     Buffer      buf;
     259              :     RelFileNumber relfilenumber;
     260              :     Page        page;
     261          261 :     List       *rlocatorlist = NIL;
     262              :     LockRelId   relid;
     263              :     Snapshot    snapshot;
     264              :     SMgrRelation smgr;
     265              :     BufferAccessStrategy bstrategy;
     266              : 
     267              :     /* Get pg_class relfilenumber. */
     268          261 :     relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     269              :                                                           RelationRelationId);
     270              : 
     271              :     /* Don't read data into shared_buffers without holding a relation lock. */
     272          261 :     relid.dbId = dbid;
     273          261 :     relid.relId = RelationRelationId;
     274          261 :     LockRelationId(&relid, AccessShareLock);
     275              : 
     276              :     /* Prepare a RelFileLocator for the pg_class relation. */
     277          261 :     rlocator.spcOid = tbid;
     278          261 :     rlocator.dbOid = dbid;
     279          261 :     rlocator.relNumber = relfilenumber;
     280              : 
     281          261 :     smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
     282          261 :     nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
     283          261 :     smgrclose(smgr);
     284              : 
     285              :     /* Use a buffer access strategy since this is a bulk read operation. */
     286          261 :     bstrategy = GetAccessStrategy(BAS_BULKREAD);
     287              : 
     288              :     /*
     289              :      * As explained in the function header comments, we need a snapshot that
     290              :      * will see all committed transactions as committed, and our transaction
     291              :      * snapshot - or the active snapshot - might not be new enough for that,
     292              :      * but the return value of GetLatestSnapshot() should work fine.
     293              :      */
     294          261 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
     295              : 
     296              :     /* Process the relation block by block. */
     297         3915 :     for (blkno = 0; blkno < nblocks; blkno++)
     298              :     {
     299         3654 :         CHECK_FOR_INTERRUPTS();
     300              : 
     301         3654 :         buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
     302              :                                         RBM_NORMAL, bstrategy, true);
     303              : 
     304         3654 :         LockBuffer(buf, BUFFER_LOCK_SHARE);
     305         3654 :         page = BufferGetPage(buf);
     306         3654 :         if (PageIsNew(page) || PageIsEmpty(page))
     307              :         {
     308            0 :             UnlockReleaseBuffer(buf);
     309            0 :             continue;
     310              :         }
     311              : 
     312              :         /* Append relevant pg_class tuples for current page to rlocatorlist. */
     313         3654 :         rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
     314              :                                                      srcpath, rlocatorlist,
     315              :                                                      snapshot);
     316              : 
     317         3654 :         UnlockReleaseBuffer(buf);
     318              :     }
     319          261 :     UnregisterSnapshot(snapshot);
     320              : 
     321              :     /* Release relation lock. */
     322          261 :     UnlockRelationId(&relid, AccessShareLock);
     323              : 
     324          261 :     return rlocatorlist;
     325              : }
     326              : 
     327              : /*
     328              :  * Scan one page of the source database's pg_class relation and add relevant
     329              :  * entries to rlocatorlist. The return value is the updated list.
     330              :  */
     331              : static List *
     332         3654 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
     333              :                               char *srcpath, List *rlocatorlist,
     334              :                               Snapshot snapshot)
     335              : {
     336         3654 :     BlockNumber blkno = BufferGetBlockNumber(buf);
     337              :     OffsetNumber offnum;
     338              :     OffsetNumber maxoff;
     339              :     HeapTupleData tuple;
     340              : 
     341         3654 :     maxoff = PageGetMaxOffsetNumber(page);
     342              : 
     343              :     /* Loop over offsets. */
     344         3654 :     for (offnum = FirstOffsetNumber;
     345       180961 :          offnum <= maxoff;
     346       177307 :          offnum = OffsetNumberNext(offnum))
     347              :     {
     348              :         ItemId      itemid;
     349              : 
     350       177307 :         itemid = PageGetItemId(page, offnum);
     351              : 
     352              :         /* Nothing to do if slot is empty or already dead. */
     353       177307 :         if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
     354       132374 :             ItemIdIsRedirected(itemid))
     355        68162 :             continue;
     356              : 
     357              :         Assert(ItemIdIsNormal(itemid));
     358       109145 :         ItemPointerSet(&(tuple.t_self), blkno, offnum);
     359              : 
     360              :         /* Initialize a HeapTupleData structure. */
     361       109145 :         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
     362       109145 :         tuple.t_len = ItemIdGetLength(itemid);
     363       109145 :         tuple.t_tableOid = RelationRelationId;
     364              : 
     365              :         /* Skip tuples that are not visible to this snapshot. */
     366       109145 :         if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
     367              :         {
     368              :             CreateDBRelInfo *relinfo;
     369              : 
     370              :             /*
     371              :              * ScanSourceDatabasePgClassTuple is in charge of constructing a
     372              :              * CreateDBRelInfo object for this tuple, but can also decide that
     373              :              * this tuple isn't something we need to copy. If we do need to
     374              :              * copy the relation, add it to the list.
     375              :              */
     376       109126 :             relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
     377              :                                                      srcpath);
     378       109126 :             if (relinfo != NULL)
     379        58492 :                 rlocatorlist = lappend(rlocatorlist, relinfo);
     380              :         }
     381              :     }
     382              : 
     383         3654 :     return rlocatorlist;
     384              : }
     385              : 
     386              : /*
     387              :  * Decide whether a certain pg_class tuple represents something that
     388              :  * needs to be copied from the source database to the destination database,
     389              :  * and if so, construct a CreateDBRelInfo for it.
     390              :  *
     391              :  * Visibility checks are handled by the caller, so our job here is just
     392              :  * to assess the data stored in the tuple.
     393              :  */
     394              : CreateDBRelInfo *
     395       109126 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
     396              :                                char *srcpath)
     397              : {
     398              :     CreateDBRelInfo *relinfo;
     399              :     Form_pg_class classForm;
     400       109126 :     RelFileNumber relfilenumber = InvalidRelFileNumber;
     401              : 
     402       109126 :     classForm = (Form_pg_class) GETSTRUCT(tuple);
     403              : 
     404              :     /*
     405              :      * Return NULL if this object does not need to be copied.
     406              :      *
     407              :      * Shared objects don't need to be copied, because they are shared.
     408              :      * Objects without storage can't be copied, because there's nothing to
     409              :      * copy. Temporary relations don't need to be copied either, because they
     410              :      * are inaccessible outside of the session that created them, which must
     411              :      * be gone already, and couldn't connect to a different database if it
     412              :      * still existed. autovacuum will eventually remove the pg_class entries
     413              :      * as well.
     414              :      */
     415       109126 :     if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
     416        97120 :         !RELKIND_HAS_STORAGE(classForm->relkind) ||
     417        58492 :         classForm->relpersistence == RELPERSISTENCE_TEMP)
     418        50634 :         return NULL;
     419              : 
     420              :     /*
     421              :      * If relfilenumber is valid then directly use it.  Otherwise, consult the
     422              :      * relmap.
     423              :      */
     424        58492 :     if (RelFileNumberIsValid(classForm->relfilenode))
     425        54055 :         relfilenumber = classForm->relfilenode;
     426              :     else
     427         4437 :         relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     428              :                                                               classForm->oid);
     429              : 
     430              :     /* We must have a valid relfilenumber. */
     431        58492 :     if (!RelFileNumberIsValid(relfilenumber))
     432            0 :         elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
     433              :              classForm->oid);
     434              : 
     435              :     /* Prepare a rel info element and add it to the list. */
     436        58492 :     relinfo = palloc_object(CreateDBRelInfo);
     437        58492 :     if (OidIsValid(classForm->reltablespace))
     438            0 :         relinfo->rlocator.spcOid = classForm->reltablespace;
     439              :     else
     440        58492 :         relinfo->rlocator.spcOid = tbid;
     441              : 
     442        58492 :     relinfo->rlocator.dbOid = dbid;
     443        58492 :     relinfo->rlocator.relNumber = relfilenumber;
     444        58492 :     relinfo->reloid = classForm->oid;
     445              : 
     446              :     /* Temporary relations were rejected above. */
     447              :     Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
     448        58492 :     relinfo->permanent =
     449        58492 :         (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
     450              : 
     451        58492 :     return relinfo;
     452              : }
     453              : 
     454              : /*
     455              :  * Create database directory and write out the PG_VERSION file in the database
     456              :  * path.  If isRedo is true, it's okay for the database directory to exist
     457              :  * already.
     458              :  */
     459              : static void
     460          284 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
     461              : {
     462              :     int         fd;
     463              :     int         nbytes;
     464              :     char        versionfile[MAXPGPATH];
     465              :     char        buf[16];
     466              : 
     467              :     /*
     468              :      * Note that we don't have to copy version data from the source database;
     469              :      * there's only one legal value.
     470              :      */
     471          284 :     sprintf(buf, "%s\n", PG_MAJORVERSION);
     472          284 :     nbytes = strlen(PG_MAJORVERSION) + 1;
     473              : 
     474              :     /* Create database directory. */
     475          284 :     if (MakePGDirectory(dbpath) < 0)
     476              :     {
     477              :         /* Failure other than already exists or not in WAL replay? */
     478            8 :         if (errno != EEXIST || !isRedo)
     479            0 :             ereport(ERROR,
     480              :                     (errcode_for_file_access(),
     481              :                      errmsg("could not create directory \"%s\": %m", dbpath)));
     482              :     }
     483              : 
     484              :     /*
     485              :      * Create PG_VERSION file in the database path.  If the file already
     486              :      * exists and we are in WAL replay then try again to open it in write
     487              :      * mode.
     488              :      */
     489          284 :     snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
     490              : 
     491          284 :     fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
     492          284 :     if (fd < 0 && errno == EEXIST && isRedo)
     493            8 :         fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
     494              : 
     495          284 :     if (fd < 0)
     496            0 :         ereport(ERROR,
     497              :                 (errcode_for_file_access(),
     498              :                  errmsg("could not create file \"%s\": %m", versionfile)));
     499              : 
     500              :     /* Write PG_MAJORVERSION in the PG_VERSION file. */
     501          284 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
     502          284 :     errno = 0;
     503          284 :     if ((int) write(fd, buf, nbytes) != nbytes)
     504              :     {
     505              :         /* If write didn't set errno, assume problem is no disk space. */
     506            0 :         if (errno == 0)
     507            0 :             errno = ENOSPC;
     508            0 :         ereport(ERROR,
     509              :                 (errcode_for_file_access(),
     510              :                  errmsg("could not write to file \"%s\": %m", versionfile)));
     511              :     }
     512          284 :     pgstat_report_wait_end();
     513              : 
     514          284 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
     515          284 :     if (pg_fsync(fd) != 0)
     516            0 :         ereport(data_sync_elevel(ERROR),
     517              :                 (errcode_for_file_access(),
     518              :                  errmsg("could not fsync file \"%s\": %m", versionfile)));
     519          284 :     fsync_fname(dbpath, true);
     520          284 :     pgstat_report_wait_end();
     521              : 
     522              :     /* Close the version file. */
     523          284 :     CloseTransientFile(fd);
     524              : 
     525              :     /* If we are not in WAL replay then write the WAL. */
     526          284 :     if (!isRedo)
     527              :     {
     528              :         xl_dbase_create_wal_log_rec xlrec;
     529              : 
     530          261 :         START_CRIT_SECTION();
     531              : 
     532          261 :         xlrec.db_id = dbid;
     533          261 :         xlrec.tablespace_id = tsid;
     534              : 
     535          261 :         XLogBeginInsert();
     536          261 :         XLogRegisterData(&xlrec,
     537              :                          sizeof(xl_dbase_create_wal_log_rec));
     538              : 
     539          261 :         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
     540              : 
     541          261 :         END_CRIT_SECTION();
     542              :     }
     543          284 : }
     544              : 
     545              : /*
     546              :  * Create a new database using the FILE_COPY strategy.
     547              :  *
     548              :  * Copy each tablespace at the filesystem level, and log a single WAL record
     549              :  * for each tablespace copied.  This requires a checkpoint before and after the
     550              :  * copy, which may be expensive, but it does greatly reduce WAL generation
     551              :  * if the copied database is large.
     552              :  */
     553              : static void
     554          138 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     555              :                             Oid dst_tsid)
     556              : {
     557              :     TableScanDesc scan;
     558              :     Relation    rel;
     559              :     HeapTuple   tuple;
     560              : 
     561              :     /*
     562              :      * Force a checkpoint before starting the copy. This will force all dirty
     563              :      * buffers, including those of unlogged tables, out to disk, to ensure
     564              :      * source database is up-to-date on disk for the copy.
     565              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
     566              :      * process any pending unlink requests. Otherwise, if a checkpoint
     567              :      * happened while we're copying files, a file might be deleted just when
     568              :      * we're about to copy it, causing the lstat() call in copydir() to fail
     569              :      * with ENOENT.
     570              :      *
     571              :      * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
     572              :      * is careful to ensure that template0 is fully written to disk prior to
     573              :      * any CREATE DATABASE commands.
     574              :      */
     575          138 :     if (!IsBinaryUpgrade)
     576          108 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     577              :                           CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
     578              : 
     579              :     /*
     580              :      * Iterate through all tablespaces of the template database, and copy each
     581              :      * one to the new database.
     582              :      */
     583          138 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
     584          138 :     scan = table_beginscan_catalog(rel, 0, NULL);
     585          448 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     586              :     {
     587          310 :         Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
     588          310 :         Oid         srctablespace = spaceform->oid;
     589              :         Oid         dsttablespace;
     590              :         char       *srcpath;
     591              :         char       *dstpath;
     592              :         struct stat st;
     593              : 
     594              :         /* No need to copy global tablespace */
     595          310 :         if (srctablespace == GLOBALTABLESPACE_OID)
     596          172 :             continue;
     597              : 
     598          172 :         srcpath = GetDatabasePath(src_dboid, srctablespace);
     599              : 
     600          310 :         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
     601          138 :             directory_is_empty(srcpath))
     602              :         {
     603              :             /* Assume we can ignore it */
     604           34 :             pfree(srcpath);
     605           34 :             continue;
     606              :         }
     607              : 
     608          138 :         if (srctablespace == src_tsid)
     609          138 :             dsttablespace = dst_tsid;
     610              :         else
     611            0 :             dsttablespace = srctablespace;
     612              : 
     613          138 :         dstpath = GetDatabasePath(dst_dboid, dsttablespace);
     614              : 
     615              :         /*
     616              :          * Copy this subdirectory to the new location
     617              :          *
     618              :          * We don't need to copy subdirectories
     619              :          */
     620          138 :         copydir(srcpath, dstpath, false);
     621              : 
     622              :         /* Record the filesystem change in XLOG */
     623              :         {
     624              :             xl_dbase_create_file_copy_rec xlrec;
     625              : 
     626          138 :             xlrec.db_id = dst_dboid;
     627          138 :             xlrec.tablespace_id = dsttablespace;
     628          138 :             xlrec.src_db_id = src_dboid;
     629          138 :             xlrec.src_tablespace_id = srctablespace;
     630              : 
     631          138 :             XLogBeginInsert();
     632          138 :             XLogRegisterData(&xlrec,
     633              :                              sizeof(xl_dbase_create_file_copy_rec));
     634              : 
     635          138 :             (void) XLogInsert(RM_DBASE_ID,
     636              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
     637              :         }
     638          138 :         pfree(srcpath);
     639          138 :         pfree(dstpath);
     640              :     }
     641          138 :     table_endscan(scan);
     642          138 :     table_close(rel, AccessShareLock);
     643              : 
     644              :     /*
     645              :      * We force a checkpoint before committing.  This effectively means that
     646              :      * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
     647              :      * replayed (at least not in ordinary crash recovery; we still have to
     648              :      * make the XLOG entry for the benefit of PITR operations). This avoids
     649              :      * two nasty scenarios:
     650              :      *
     651              :      * #1: At wal_level=minimal, we don't XLOG the contents of newly created
     652              :      * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
     653              :      * of DBASE_CREATE replay would lose such files created in the new
     654              :      * database between our commit and the next checkpoint.
     655              :      *
     656              :      * #2: Since we have to recopy the source database during DBASE_CREATE
     657              :      * replay, we run the risk of copying changes in it that were committed
     658              :      * after the original CREATE DATABASE command but before the system crash
     659              :      * that led to the replay.  This is at least unexpected and at worst could
     660              :      * lead to inconsistencies, eg duplicate table names.
     661              :      *
     662              :      * (Both of these were real bugs in releases 8.0 through 8.0.3.)
     663              :      *
     664              :      * In PITR replay, the first of these isn't an issue, and the second is
     665              :      * only a risk if the CREATE DATABASE and subsequent template database
     666              :      * change both occur while a base backup is being taken. There doesn't
     667              :      * seem to be much we can do about that except document it as a
     668              :      * limitation.
     669              :      *
     670              :      * In binary upgrade mode, we can skip this checkpoint because neither of
     671              :      * these problems applies: we don't ever replay the WAL generated during
     672              :      * pg_upgrade, and we don't support taking base backups during pg_upgrade
     673              :      * (not to mention that we don't concurrently modify template0, either).
     674              :      *
     675              :      * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
     676              :      * strategy that avoids these problems.
     677              :      */
     678          138 :     if (!IsBinaryUpgrade)
     679          108 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     680              :                           CHECKPOINT_WAIT);
     681          138 : }
     682              : 
     683              : /*
     684              :  * CREATE DATABASE
     685              :  */
     686              : Oid
     687          420 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
     688              : {
     689              :     Oid         src_dboid;
     690              :     Oid         src_owner;
     691          420 :     int         src_encoding = -1;
     692          420 :     char       *src_collate = NULL;
     693          420 :     char       *src_ctype = NULL;
     694          420 :     char       *src_locale = NULL;
     695          420 :     char       *src_icurules = NULL;
     696          420 :     char        src_locprovider = '\0';
     697          420 :     char       *src_collversion = NULL;
     698              :     bool        src_istemplate;
     699          420 :     bool        src_hasloginevt = false;
     700              :     bool        src_allowconn;
     701          420 :     TransactionId src_frozenxid = InvalidTransactionId;
     702          420 :     MultiXactId src_minmxid = InvalidMultiXactId;
     703              :     Oid         src_deftablespace;
     704              :     volatile Oid dst_deftablespace;
     705              :     Relation    pg_database_rel;
     706              :     HeapTuple   tuple;
     707          420 :     Datum       new_record[Natts_pg_database] = {0};
     708          420 :     bool        new_record_nulls[Natts_pg_database] = {0};
     709          420 :     Oid         dboid = InvalidOid;
     710              :     Oid         datdba;
     711              :     ListCell   *option;
     712          420 :     DefElem    *tablespacenameEl = NULL;
     713          420 :     DefElem    *ownerEl = NULL;
     714          420 :     DefElem    *templateEl = NULL;
     715          420 :     DefElem    *encodingEl = NULL;
     716          420 :     DefElem    *localeEl = NULL;
     717          420 :     DefElem    *builtinlocaleEl = NULL;
     718          420 :     DefElem    *collateEl = NULL;
     719          420 :     DefElem    *ctypeEl = NULL;
     720          420 :     DefElem    *iculocaleEl = NULL;
     721          420 :     DefElem    *icurulesEl = NULL;
     722          420 :     DefElem    *locproviderEl = NULL;
     723          420 :     DefElem    *istemplateEl = NULL;
     724          420 :     DefElem    *allowconnectionsEl = NULL;
     725          420 :     DefElem    *connlimitEl = NULL;
     726          420 :     DefElem    *collversionEl = NULL;
     727          420 :     DefElem    *strategyEl = NULL;
     728          420 :     char       *dbname = stmt->dbname;
     729          420 :     char       *dbowner = NULL;
     730          420 :     const char *dbtemplate = NULL;
     731          420 :     char       *dbcollate = NULL;
     732          420 :     char       *dbctype = NULL;
     733          420 :     const char *dblocale = NULL;
     734          420 :     char       *dbicurules = NULL;
     735          420 :     char        dblocprovider = '\0';
     736              :     char       *canonname;
     737          420 :     int         encoding = -1;
     738          420 :     bool        dbistemplate = false;
     739          420 :     bool        dballowconnections = true;
     740          420 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
     741          420 :     char       *dbcollversion = NULL;
     742              :     int         notherbackends;
     743              :     int         npreparedxacts;
     744          420 :     CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
     745              :     createdb_failure_params fparms;
     746              : 
     747              :     /* Report error if name has \n or \r character. */
     748          420 :     if (strpbrk(dbname, "\n\r"))
     749            3 :         ereport(ERROR,
     750              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     751              :                  errmsg("database name \"%s\" contains a newline or carriage return character", dbname)));
     752              : 
     753              :     /* Extract options from the statement node tree */
     754         1237 :     foreach(option, stmt->options)
     755              :     {
     756          820 :         DefElem    *defel = (DefElem *) lfirst(option);
     757              : 
     758          820 :         if (strcmp(defel->defname, "tablespace") == 0)
     759              :         {
     760           17 :             if (tablespacenameEl)
     761            0 :                 errorConflictingDefElem(defel, pstate);
     762           17 :             tablespacenameEl = defel;
     763              :         }
     764          803 :         else if (strcmp(defel->defname, "owner") == 0)
     765              :         {
     766            1 :             if (ownerEl)
     767            0 :                 errorConflictingDefElem(defel, pstate);
     768            1 :             ownerEl = defel;
     769              :         }
     770          802 :         else if (strcmp(defel->defname, "template") == 0)
     771              :         {
     772          187 :             if (templateEl)
     773            0 :                 errorConflictingDefElem(defel, pstate);
     774          187 :             templateEl = defel;
     775              :         }
     776          615 :         else if (strcmp(defel->defname, "encoding") == 0)
     777              :         {
     778           54 :             if (encodingEl)
     779            0 :                 errorConflictingDefElem(defel, pstate);
     780           54 :             encodingEl = defel;
     781              :         }
     782          561 :         else if (strcmp(defel->defname, "locale") == 0)
     783              :         {
     784           55 :             if (localeEl)
     785            0 :                 errorConflictingDefElem(defel, pstate);
     786           55 :             localeEl = defel;
     787              :         }
     788          506 :         else if (strcmp(defel->defname, "builtin_locale") == 0)
     789              :         {
     790            8 :             if (builtinlocaleEl)
     791            0 :                 errorConflictingDefElem(defel, pstate);
     792            8 :             builtinlocaleEl = defel;
     793              :         }
     794          498 :         else if (strcmp(defel->defname, "lc_collate") == 0)
     795              :         {
     796            9 :             if (collateEl)
     797            0 :                 errorConflictingDefElem(defel, pstate);
     798            9 :             collateEl = defel;
     799              :         }
     800          489 :         else if (strcmp(defel->defname, "lc_ctype") == 0)
     801              :         {
     802            9 :             if (ctypeEl)
     803            0 :                 errorConflictingDefElem(defel, pstate);
     804            9 :             ctypeEl = defel;
     805              :         }
     806          480 :         else if (strcmp(defel->defname, "icu_locale") == 0)
     807              :         {
     808            5 :             if (iculocaleEl)
     809            0 :                 errorConflictingDefElem(defel, pstate);
     810            5 :             iculocaleEl = defel;
     811              :         }
     812          475 :         else if (strcmp(defel->defname, "icu_rules") == 0)
     813              :         {
     814            1 :             if (icurulesEl)
     815            0 :                 errorConflictingDefElem(defel, pstate);
     816            1 :             icurulesEl = defel;
     817              :         }
     818          474 :         else if (strcmp(defel->defname, "locale_provider") == 0)
     819              :         {
     820           60 :             if (locproviderEl)
     821            0 :                 errorConflictingDefElem(defel, pstate);
     822           60 :             locproviderEl = defel;
     823              :         }
     824          414 :         else if (strcmp(defel->defname, "is_template") == 0)
     825              :         {
     826           51 :             if (istemplateEl)
     827            0 :                 errorConflictingDefElem(defel, pstate);
     828           51 :             istemplateEl = defel;
     829              :         }
     830          363 :         else if (strcmp(defel->defname, "allow_connections") == 0)
     831              :         {
     832           50 :             if (allowconnectionsEl)
     833            0 :                 errorConflictingDefElem(defel, pstate);
     834           50 :             allowconnectionsEl = defel;
     835              :         }
     836          313 :         else if (strcmp(defel->defname, "connection_limit") == 0)
     837              :         {
     838            0 :             if (connlimitEl)
     839            0 :                 errorConflictingDefElem(defel, pstate);
     840            0 :             connlimitEl = defel;
     841              :         }
     842          313 :         else if (strcmp(defel->defname, "collation_version") == 0)
     843              :         {
     844           30 :             if (collversionEl)
     845            0 :                 errorConflictingDefElem(defel, pstate);
     846           30 :             collversionEl = defel;
     847              :         }
     848          283 :         else if (strcmp(defel->defname, "location") == 0)
     849              :         {
     850            0 :             ereport(WARNING,
     851              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     852              :                      errmsg("LOCATION is not supported anymore"),
     853              :                      errhint("Consider using tablespaces instead."),
     854              :                      parser_errposition(pstate, defel->location)));
     855              :         }
     856          283 :         else if (strcmp(defel->defname, "oid") == 0)
     857              :         {
     858          133 :             dboid = defGetObjectId(defel);
     859              : 
     860              :             /*
     861              :              * We don't normally permit new databases to be created with
     862              :              * system-assigned OIDs. pg_upgrade tries to preserve database
     863              :              * OIDs, so we can't allow any database to be created with an OID
     864              :              * that might be in use in a freshly-initialized cluster created
     865              :              * by some future version. We assume all such OIDs will be from
     866              :              * the system-managed OID range.
     867              :              *
     868              :              * As an exception, however, we permit any OID to be assigned when
     869              :              * allow_system_table_mods=on (so that initdb can assign system
     870              :              * OIDs to template0 and postgres) or when performing a binary
     871              :              * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
     872              :              * in the source cluster).
     873              :              */
     874          133 :             if (dboid < FirstNormalObjectId &&
     875          116 :                 !allowSystemTableMods && !IsBinaryUpgrade)
     876            0 :                 ereport(ERROR,
     877              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
     878              :                         errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
     879              :         }
     880          150 :         else if (strcmp(defel->defname, "strategy") == 0)
     881              :         {
     882          150 :             if (strategyEl)
     883            0 :                 errorConflictingDefElem(defel, pstate);
     884          150 :             strategyEl = defel;
     885              :         }
     886              :         else
     887            0 :             ereport(ERROR,
     888              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     889              :                      errmsg("option \"%s\" not recognized", defel->defname),
     890              :                      parser_errposition(pstate, defel->location)));
     891              :     }
     892              : 
     893          417 :     if (ownerEl && ownerEl->arg)
     894            1 :         dbowner = defGetString(ownerEl);
     895          417 :     if (templateEl && templateEl->arg)
     896          187 :         dbtemplate = defGetString(templateEl);
     897          417 :     if (encodingEl && encodingEl->arg)
     898              :     {
     899              :         const char *encoding_name;
     900              : 
     901           54 :         if (IsA(encodingEl->arg, Integer))
     902              :         {
     903            0 :             encoding = defGetInt32(encodingEl);
     904            0 :             encoding_name = pg_encoding_to_char(encoding);
     905            0 :             if (strcmp(encoding_name, "") == 0 ||
     906            0 :                 pg_valid_server_encoding(encoding_name) < 0)
     907            0 :                 ereport(ERROR,
     908              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     909              :                          errmsg("%d is not a valid encoding code",
     910              :                                 encoding),
     911              :                          parser_errposition(pstate, encodingEl->location)));
     912              :         }
     913              :         else
     914              :         {
     915           54 :             encoding_name = defGetString(encodingEl);
     916           54 :             encoding = pg_valid_server_encoding(encoding_name);
     917           54 :             if (encoding < 0)
     918            0 :                 ereport(ERROR,
     919              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     920              :                          errmsg("%s is not a valid encoding name",
     921              :                                 encoding_name),
     922              :                          parser_errposition(pstate, encodingEl->location)));
     923              :         }
     924              :     }
     925          417 :     if (localeEl && localeEl->arg)
     926              :     {
     927           55 :         dbcollate = defGetString(localeEl);
     928           55 :         dbctype = defGetString(localeEl);
     929           55 :         dblocale = defGetString(localeEl);
     930              :     }
     931          417 :     if (builtinlocaleEl && builtinlocaleEl->arg)
     932            8 :         dblocale = defGetString(builtinlocaleEl);
     933          417 :     if (collateEl && collateEl->arg)
     934            9 :         dbcollate = defGetString(collateEl);
     935          417 :     if (ctypeEl && ctypeEl->arg)
     936            9 :         dbctype = defGetString(ctypeEl);
     937          417 :     if (iculocaleEl && iculocaleEl->arg)
     938            5 :         dblocale = defGetString(iculocaleEl);
     939          417 :     if (icurulesEl && icurulesEl->arg)
     940            1 :         dbicurules = defGetString(icurulesEl);
     941          417 :     if (locproviderEl && locproviderEl->arg)
     942              :     {
     943           60 :         char       *locproviderstr = defGetString(locproviderEl);
     944              : 
     945           60 :         if (pg_strcasecmp(locproviderstr, "builtin") == 0)
     946           17 :             dblocprovider = COLLPROVIDER_BUILTIN;
     947           43 :         else if (pg_strcasecmp(locproviderstr, "icu") == 0)
     948            8 :             dblocprovider = COLLPROVIDER_ICU;
     949           35 :         else if (pg_strcasecmp(locproviderstr, "libc") == 0)
     950           34 :             dblocprovider = COLLPROVIDER_LIBC;
     951              :         else
     952            1 :             ereport(ERROR,
     953              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     954              :                      errmsg("unrecognized locale provider: %s",
     955              :                             locproviderstr)));
     956              :     }
     957          416 :     if (istemplateEl && istemplateEl->arg)
     958           51 :         dbistemplate = defGetBoolean(istemplateEl);
     959          416 :     if (allowconnectionsEl && allowconnectionsEl->arg)
     960           50 :         dballowconnections = defGetBoolean(allowconnectionsEl);
     961          416 :     if (connlimitEl && connlimitEl->arg)
     962              :     {
     963            0 :         dbconnlimit = defGetInt32(connlimitEl);
     964            0 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
     965            0 :             ereport(ERROR,
     966              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     967              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
     968              :     }
     969          416 :     if (collversionEl)
     970           30 :         dbcollversion = defGetString(collversionEl);
     971              : 
     972              :     /* obtain OID of proposed owner */
     973          416 :     if (dbowner)
     974            1 :         datdba = get_role_oid(dbowner, false);
     975              :     else
     976          415 :         datdba = GetUserId();
     977              : 
     978              :     /*
     979              :      * To create a database, must have createdb privilege and must be able to
     980              :      * become the target role (this does not imply that the target role itself
     981              :      * must have createdb privilege).  The latter provision guards against
     982              :      * "giveaway" attacks.  Note that a superuser will always have both of
     983              :      * these privileges a fortiori.
     984              :      */
     985          416 :     if (!have_createdb_privilege())
     986            3 :         ereport(ERROR,
     987              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     988              :                  errmsg("permission denied to create database")));
     989              : 
     990          413 :     check_can_set_role(GetUserId(), datdba);
     991              : 
     992              :     /*
     993              :      * Lookup database (template) to be cloned, and obtain share lock on it.
     994              :      * ShareLock allows two CREATE DATABASEs to work from the same template
     995              :      * concurrently, while ensuring no one is busy dropping it in parallel
     996              :      * (which would be Very Bad since we'd likely get an incomplete copy
     997              :      * without knowing it).  This also prevents any new connections from being
     998              :      * made to the source until we finish copying it, so we can be sure it
     999              :      * won't change underneath us.
    1000              :      */
    1001          413 :     if (!dbtemplate)
    1002          227 :         dbtemplate = "template1"; /* Default template database name */
    1003              : 
    1004          413 :     if (!get_db_info(dbtemplate, ShareLock,
    1005              :                      &src_dboid, &src_owner, &src_encoding,
    1006              :                      &src_istemplate, &src_allowconn, &src_hasloginevt,
    1007              :                      &src_frozenxid, &src_minmxid, &src_deftablespace,
    1008              :                      &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
    1009              :                      &src_collversion))
    1010            0 :         ereport(ERROR,
    1011              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1012              :                  errmsg("template database \"%s\" does not exist",
    1013              :                         dbtemplate)));
    1014              : 
    1015              :     /*
    1016              :      * If the source database was in the process of being dropped, we can't
    1017              :      * use it as a template.
    1018              :      */
    1019          413 :     if (database_is_invalid_oid(src_dboid))
    1020            1 :         ereport(ERROR,
    1021              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1022              :                 errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
    1023              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    1024              : 
    1025              :     /*
    1026              :      * Permission check: to copy a DB that's not marked datistemplate, you
    1027              :      * must be superuser or the owner thereof.
    1028              :      */
    1029          412 :     if (!src_istemplate)
    1030              :     {
    1031           14 :         if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
    1032            0 :             ereport(ERROR,
    1033              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1034              :                      errmsg("permission denied to copy database \"%s\"",
    1035              :                             dbtemplate)));
    1036              :     }
    1037              : 
    1038              :     /* Validate the database creation strategy. */
    1039          412 :     if (strategyEl && strategyEl->arg)
    1040              :     {
    1041              :         char       *strategy;
    1042              : 
    1043          150 :         strategy = defGetString(strategyEl);
    1044          150 :         if (pg_strcasecmp(strategy, "wal_log") == 0)
    1045           11 :             dbstrategy = CREATEDB_WAL_LOG;
    1046          139 :         else if (pg_strcasecmp(strategy, "file_copy") == 0)
    1047          138 :             dbstrategy = CREATEDB_FILE_COPY;
    1048              :         else
    1049            1 :             ereport(ERROR,
    1050              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1051              :                      errmsg("invalid create database strategy \"%s\"", strategy),
    1052              :                      errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
    1053              :     }
    1054              : 
    1055              :     /* If encoding or locales are defaulted, use source's setting */
    1056          411 :     if (encoding < 0)
    1057          357 :         encoding = src_encoding;
    1058          411 :     if (dbcollate == NULL)
    1059          350 :         dbcollate = src_collate;
    1060          411 :     if (dbctype == NULL)
    1061          350 :         dbctype = src_ctype;
    1062          411 :     if (dblocprovider == '\0')
    1063          352 :         dblocprovider = src_locprovider;
    1064          411 :     if (dblocale == NULL && dblocprovider == src_locprovider)
    1065          348 :         dblocale = src_locale;
    1066          411 :     if (dbicurules == NULL)
    1067          410 :         dbicurules = src_icurules;
    1068              : 
    1069              :     /* Some encodings are client only */
    1070          411 :     if (!PG_VALID_BE_ENCODING(encoding))
    1071            0 :         ereport(ERROR,
    1072              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1073              :                  errmsg("invalid server encoding %d", encoding)));
    1074              : 
    1075              :     /* Check that the chosen locales are valid, and get canonical spellings */
    1076          411 :     if (!check_locale(LC_COLLATE, dbcollate, &canonname))
    1077              :     {
    1078            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1079            0 :             ereport(ERROR,
    1080              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1081              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1082              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1083            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1084            0 :             ereport(ERROR,
    1085              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1086              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1087              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1088              :         else
    1089            1 :             ereport(ERROR,
    1090              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1091              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
    1092              :     }
    1093          410 :     dbcollate = canonname;
    1094          410 :     if (!check_locale(LC_CTYPE, dbctype, &canonname))
    1095              :     {
    1096            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1097            0 :             ereport(ERROR,
    1098              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1099              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1100              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1101            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1102            0 :             ereport(ERROR,
    1103              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1104              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1105              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1106              :         else
    1107            1 :             ereport(ERROR,
    1108              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1109              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
    1110              :     }
    1111              : 
    1112          409 :     dbctype = canonname;
    1113              : 
    1114          409 :     check_encoding_locale_matches(encoding, dbcollate, dbctype);
    1115              : 
    1116              :     /* validate provider-specific parameters */
    1117          409 :     if (dblocprovider != COLLPROVIDER_BUILTIN)
    1118              :     {
    1119          378 :         if (builtinlocaleEl)
    1120            0 :             ereport(ERROR,
    1121              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1122              :                      errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
    1123              :     }
    1124              : 
    1125          409 :     if (dblocprovider != COLLPROVIDER_ICU)
    1126              :     {
    1127          394 :         if (iculocaleEl)
    1128            1 :             ereport(ERROR,
    1129              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1130              :                      errmsg("ICU locale cannot be specified unless locale provider is ICU")));
    1131              : 
    1132          393 :         if (dbicurules)
    1133            1 :             ereport(ERROR,
    1134              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1135              :                      errmsg("ICU rules cannot be specified unless locale provider is ICU")));
    1136              :     }
    1137              : 
    1138              :     /* validate and canonicalize locale for the provider */
    1139          407 :     if (dblocprovider == COLLPROVIDER_BUILTIN)
    1140              :     {
    1141              :         /*
    1142              :          * This would happen if template0 uses the libc provider but the new
    1143              :          * database uses builtin.
    1144              :          */
    1145           29 :         if (!dblocale)
    1146            1 :             ereport(ERROR,
    1147              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1148              :                      errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
    1149              : 
    1150           28 :         dblocale = builtin_validate_locale(encoding, dblocale);
    1151              :     }
    1152          378 :     else if (dblocprovider == COLLPROVIDER_ICU)
    1153              :     {
    1154           15 :         if (!(is_encoding_supported_by_icu(encoding)))
    1155            1 :             ereport(ERROR,
    1156              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1157              :                      errmsg("encoding \"%s\" is not supported with ICU provider",
    1158              :                             pg_encoding_to_char(encoding))));
    1159              : 
    1160              :         /*
    1161              :          * This would happen if template0 uses the libc provider but the new
    1162              :          * database uses icu.
    1163              :          */
    1164           14 :         if (!dblocale)
    1165            1 :             ereport(ERROR,
    1166              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1167              :                      errmsg("LOCALE or ICU_LOCALE must be specified")));
    1168              : 
    1169              :         /*
    1170              :          * During binary upgrade, or when the locale came from the template
    1171              :          * database, preserve locale string. Otherwise, canonicalize to a
    1172              :          * language tag.
    1173              :          */
    1174           13 :         if (!IsBinaryUpgrade && dblocale != src_locale)
    1175              :         {
    1176            7 :             char       *langtag = icu_language_tag(dblocale,
    1177              :                                                    icu_validation_level);
    1178              : 
    1179            7 :             if (langtag && strcmp(dblocale, langtag) != 0)
    1180              :             {
    1181            3 :                 ereport(NOTICE,
    1182              :                         (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
    1183              :                                 langtag, dblocale)));
    1184              : 
    1185            3 :                 dblocale = langtag;
    1186              :             }
    1187              :         }
    1188              : 
    1189           13 :         icu_validate_locale(dblocale);
    1190              :     }
    1191              : 
    1192              :     /* for libc, locale comes from datcollate and datctype */
    1193          402 :     if (dblocprovider == COLLPROVIDER_LIBC)
    1194          363 :         dblocale = NULL;
    1195              : 
    1196              :     /*
    1197              :      * Check that the new encoding and locale settings match the source
    1198              :      * database.  We insist on this because we simply copy the source data ---
    1199              :      * any non-ASCII data would be wrongly encoded, and any indexes sorted
    1200              :      * according to the source locale would be wrong.
    1201              :      *
    1202              :      * However, we assume that template0 doesn't contain any non-ASCII data
    1203              :      * nor any indexes that depend on collation or ctype, so template0 can be
    1204              :      * used as template for creating a database with any encoding or locale.
    1205              :      */
    1206          402 :     if (strcmp(dbtemplate, "template0") != 0)
    1207              :     {
    1208          242 :         if (encoding != src_encoding)
    1209            0 :             ereport(ERROR,
    1210              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1211              :                      errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
    1212              :                             pg_encoding_to_char(encoding),
    1213              :                             pg_encoding_to_char(src_encoding)),
    1214              :                      errhint("Use the same encoding as in the template database, or use template0 as template.")));
    1215              : 
    1216          242 :         if (strcmp(dbcollate, src_collate) != 0)
    1217            1 :             ereport(ERROR,
    1218              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1219              :                      errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
    1220              :                             dbcollate, src_collate),
    1221              :                      errhint("Use the same collation as in the template database, or use template0 as template.")));
    1222              : 
    1223          241 :         if (strcmp(dbctype, src_ctype) != 0)
    1224            0 :             ereport(ERROR,
    1225              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1226              :                      errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
    1227              :                             dbctype, src_ctype),
    1228              :                      errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
    1229              : 
    1230          241 :         if (dblocprovider != src_locprovider)
    1231            0 :             ereport(ERROR,
    1232              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1233              :                      errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
    1234              :                             collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
    1235              :                      errhint("Use the same locale provider as in the template database, or use template0 as template.")));
    1236              : 
    1237          241 :         if (dblocprovider == COLLPROVIDER_ICU)
    1238              :         {
    1239              :             char       *val1;
    1240              :             char       *val2;
    1241              : 
    1242              :             Assert(dblocale);
    1243              :             Assert(src_locale);
    1244            6 :             if (strcmp(dblocale, src_locale) != 0)
    1245            0 :                 ereport(ERROR,
    1246              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1247              :                          errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
    1248              :                                 dblocale, src_locale),
    1249              :                          errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
    1250              : 
    1251            6 :             val1 = dbicurules;
    1252            6 :             if (!val1)
    1253            6 :                 val1 = "";
    1254            6 :             val2 = src_icurules;
    1255            6 :             if (!val2)
    1256            6 :                 val2 = "";
    1257            6 :             if (strcmp(val1, val2) != 0)
    1258            0 :                 ereport(ERROR,
    1259              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1260              :                          errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
    1261              :                                 val1, val2),
    1262              :                          errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
    1263              :         }
    1264              :     }
    1265              : 
    1266              :     /*
    1267              :      * If we got a collation version for the template database, check that it
    1268              :      * matches the actual OS collation version.  Otherwise error; the user
    1269              :      * needs to fix the template database first.  Don't complain if a
    1270              :      * collation version was specified explicitly as a statement option; that
    1271              :      * is used by pg_upgrade to reproduce the old state exactly.
    1272              :      *
    1273              :      * (If the template database has no collation version, then either the
    1274              :      * platform/provider does not support collation versioning, or it's
    1275              :      * template0, for which we stipulate that it does not contain
    1276              :      * collation-using objects.)
    1277              :      */
    1278          401 :     if (src_collversion && !collversionEl)
    1279              :     {
    1280              :         char       *actual_versionstr;
    1281              :         const char *locale;
    1282              : 
    1283          163 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1284          152 :             locale = dbcollate;
    1285              :         else
    1286           11 :             locale = dblocale;
    1287              : 
    1288          163 :         actual_versionstr = get_collation_actual_version(dblocprovider, locale);
    1289          163 :         if (!actual_versionstr)
    1290            0 :             ereport(ERROR,
    1291              :                     (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
    1292              :                             dbtemplate)));
    1293              : 
    1294          163 :         if (strcmp(actual_versionstr, src_collversion) != 0)
    1295            0 :             ereport(ERROR,
    1296              :                     (errmsg("template database \"%s\" has a collation version mismatch",
    1297              :                             dbtemplate),
    1298              :                      errdetail("The template database was created using collation version %s, "
    1299              :                                "but the operating system provides version %s.",
    1300              :                                src_collversion, actual_versionstr),
    1301              :                      errhint("Rebuild all objects in the template database that use the default collation and run "
    1302              :                              "ALTER DATABASE %s REFRESH COLLATION VERSION, "
    1303              :                              "or build PostgreSQL with the right library version.",
    1304              :                              quote_identifier(dbtemplate))));
    1305              :     }
    1306              : 
    1307          401 :     if (dbcollversion == NULL)
    1308          371 :         dbcollversion = src_collversion;
    1309              : 
    1310              :     /*
    1311              :      * Normally, we copy the collation version from the template database.
    1312              :      * This last resort only applies if the template database does not have a
    1313              :      * collation version, which is normally only the case for template0.
    1314              :      */
    1315          401 :     if (dbcollversion == NULL)
    1316              :     {
    1317              :         const char *locale;
    1318              : 
    1319          208 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1320          187 :             locale = dbcollate;
    1321              :         else
    1322           21 :             locale = dblocale;
    1323              : 
    1324          208 :         dbcollversion = get_collation_actual_version(dblocprovider, locale);
    1325              :     }
    1326              : 
    1327              :     /* Resolve default tablespace for new database */
    1328          401 :     if (tablespacenameEl && tablespacenameEl->arg)
    1329           17 :     {
    1330              :         char       *tablespacename;
    1331              :         AclResult   aclresult;
    1332              : 
    1333           17 :         tablespacename = defGetString(tablespacenameEl);
    1334           17 :         dst_deftablespace = get_tablespace_oid(tablespacename, false);
    1335              :         /* check permissions */
    1336           17 :         aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
    1337              :                                     ACL_CREATE);
    1338           17 :         if (aclresult != ACLCHECK_OK)
    1339            0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1340              :                            tablespacename);
    1341              : 
    1342              :         /* pg_global must never be the default tablespace */
    1343           17 :         if (dst_deftablespace == GLOBALTABLESPACE_OID)
    1344            0 :             ereport(ERROR,
    1345              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1346              :                      errmsg("pg_global cannot be used as default tablespace")));
    1347              : 
    1348              :         /*
    1349              :          * If we are trying to change the default tablespace of the template,
    1350              :          * we require that the template not have any files in the new default
    1351              :          * tablespace.  This is necessary because otherwise the copied
    1352              :          * database would contain pg_class rows that refer to its default
    1353              :          * tablespace both explicitly (by OID) and implicitly (as zero), which
    1354              :          * would cause problems.  For example another CREATE DATABASE using
    1355              :          * the copied database as template, and trying to change its default
    1356              :          * tablespace again, would yield outright incorrect results (it would
    1357              :          * improperly move tables to the new default tablespace that should
    1358              :          * stay in the same tablespace).
    1359              :          */
    1360           17 :         if (dst_deftablespace != src_deftablespace)
    1361              :         {
    1362              :             char       *srcpath;
    1363              :             struct stat st;
    1364              : 
    1365           17 :             srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
    1366              : 
    1367           17 :             if (stat(srcpath, &st) == 0 &&
    1368            0 :                 S_ISDIR(st.st_mode) &&
    1369            0 :                 !directory_is_empty(srcpath))
    1370            0 :                 ereport(ERROR,
    1371              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1372              :                          errmsg("cannot assign new default tablespace \"%s\"",
    1373              :                                 tablespacename),
    1374              :                          errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
    1375              :                                    dbtemplate)));
    1376           17 :             pfree(srcpath);
    1377              :         }
    1378              :     }
    1379              :     else
    1380              :     {
    1381              :         /* Use template database's default tablespace */
    1382          384 :         dst_deftablespace = src_deftablespace;
    1383              :         /* Note there is no additional permission check in this path */
    1384              :     }
    1385              : 
    1386              :     /*
    1387              :      * If built with appropriate switch, whine when regression-testing
    1388              :      * conventions for database names are violated.  But don't complain during
    1389              :      * initdb.
    1390              :      */
    1391              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1392              :     if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
    1393              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1394              : #endif
    1395              : 
    1396              :     /*
    1397              :      * Check for db name conflict.  This is just to give a more friendly error
    1398              :      * message than "unique index violation".  There's a race condition but
    1399              :      * we're willing to accept the less friendly message in that case.
    1400              :      */
    1401          401 :     if (OidIsValid(get_database_oid(dbname, true)))
    1402            1 :         ereport(ERROR,
    1403              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1404              :                  errmsg("database \"%s\" already exists", dbname)));
    1405              : 
    1406              :     /*
    1407              :      * The source DB can't have any active backends, except this one
    1408              :      * (exception is to allow CREATE DB while connected to template1).
    1409              :      * Otherwise we might copy inconsistent data.
    1410              :      *
    1411              :      * This should be last among the basic error checks, because it involves
    1412              :      * potential waiting; we may as well throw an error first if we're gonna
    1413              :      * throw one.
    1414              :      */
    1415          400 :     if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
    1416            1 :         ereport(ERROR,
    1417              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1418              :                  errmsg("source database \"%s\" is being accessed by other users",
    1419              :                         dbtemplate),
    1420              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1421              : 
    1422              :     /*
    1423              :      * Select an OID for the new database, checking that it doesn't have a
    1424              :      * filename conflict with anything already existing in the tablespace
    1425              :      * directories.
    1426              :      */
    1427          399 :     pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1428              : 
    1429              :     /*
    1430              :      * If database OID is configured, check if the OID is already in use or
    1431              :      * data directory already exists.
    1432              :      */
    1433          399 :     if (OidIsValid(dboid))
    1434              :     {
    1435          133 :         char       *existing_dbname = get_database_name(dboid);
    1436              : 
    1437          133 :         if (existing_dbname != NULL)
    1438            0 :             ereport(ERROR,
    1439              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1440              :                     errmsg("database OID %u is already in use by database \"%s\"",
    1441              :                            dboid, existing_dbname));
    1442              : 
    1443          133 :         if (check_db_file_conflict(dboid))
    1444            0 :             ereport(ERROR,
    1445              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1446              :                     errmsg("data directory with the specified OID %u already exists", dboid));
    1447              :     }
    1448              :     else
    1449              :     {
    1450              :         /* Select an OID for the new database if is not explicitly configured. */
    1451              :         do
    1452              :         {
    1453          266 :             dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
    1454              :                                        Anum_pg_database_oid);
    1455          266 :         } while (check_db_file_conflict(dboid));
    1456              :     }
    1457              : 
    1458              :     /*
    1459              :      * Insert a new tuple into pg_database.  This establishes our ownership of
    1460              :      * the new database name (anyone else trying to insert the same name will
    1461              :      * block on the unique index, and fail after we commit).
    1462              :      */
    1463              : 
    1464              :     Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
    1465              :            (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
    1466              : 
    1467              :     /* Form tuple */
    1468          399 :     new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
    1469          399 :     new_record[Anum_pg_database_datname - 1] =
    1470          399 :         DirectFunctionCall1(namein, CStringGetDatum(dbname));
    1471          399 :     new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
    1472          399 :     new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
    1473          399 :     new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
    1474          399 :     new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    1475          399 :     new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    1476          399 :     new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
    1477          399 :     new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    1478          399 :     new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
    1479          399 :     new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
    1480          399 :     new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
    1481          399 :     new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
    1482          399 :     new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
    1483          399 :     if (dblocale)
    1484           38 :         new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
    1485              :     else
    1486          361 :         new_record_nulls[Anum_pg_database_datlocale - 1] = true;
    1487          399 :     if (dbicurules)
    1488            0 :         new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
    1489              :     else
    1490          399 :         new_record_nulls[Anum_pg_database_daticurules - 1] = true;
    1491          399 :     if (dbcollversion)
    1492          343 :         new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
    1493              :     else
    1494           56 :         new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
    1495              : 
    1496              :     /*
    1497              :      * We deliberately set datacl to default (NULL), rather than copying it
    1498              :      * from the template database.  Copying it would be a bad idea when the
    1499              :      * owner is not the same as the template's owner.
    1500              :      */
    1501          399 :     new_record_nulls[Anum_pg_database_datacl - 1] = true;
    1502              : 
    1503          399 :     tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
    1504              :                             new_record, new_record_nulls);
    1505              : 
    1506          399 :     CatalogTupleInsert(pg_database_rel, tuple);
    1507              : 
    1508              :     /*
    1509              :      * Now generate additional catalog entries associated with the new DB
    1510              :      */
    1511              : 
    1512              :     /* Register owner dependency */
    1513          399 :     recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
    1514              : 
    1515              :     /* Create pg_shdepend entries for objects within database */
    1516          399 :     copyTemplateDependencies(src_dboid, dboid);
    1517              : 
    1518              :     /* Post creation hook for new database */
    1519          399 :     InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
    1520              : 
    1521              :     /*
    1522              :      * If we're going to be reading data for the to-be-created database into
    1523              :      * shared_buffers, take a lock on it. Nobody should know that this
    1524              :      * database exists yet, but it's good to maintain the invariant that an
    1525              :      * AccessExclusiveLock on the database is sufficient to drop all of its
    1526              :      * buffers without worrying about more being read later.
    1527              :      *
    1528              :      * Note that we need to do this before entering the
    1529              :      * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
    1530              :      * expects this lock to be held already.
    1531              :      */
    1532          399 :     if (dbstrategy == CREATEDB_WAL_LOG)
    1533          261 :         LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
    1534              : 
    1535              :     /*
    1536              :      * Once we start copying subdirectories, we need to be able to clean 'em
    1537              :      * up if we fail.  Use an ENSURE block to make sure this happens.  (This
    1538              :      * is not a 100% solution, because of the possibility of failure during
    1539              :      * transaction commit after we leave this routine, but it should handle
    1540              :      * most scenarios.)
    1541              :      */
    1542          399 :     fparms.src_dboid = src_dboid;
    1543          399 :     fparms.dest_dboid = dboid;
    1544          399 :     fparms.strategy = dbstrategy;
    1545              : 
    1546          399 :     PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1547              :                             PointerGetDatum(&fparms));
    1548              :     {
    1549              :         /*
    1550              :          * If the user has asked to create a database with WAL_LOG strategy
    1551              :          * then call CreateDatabaseUsingWalLog, which will copy the database
    1552              :          * at the block level and it will WAL log each copied block.
    1553              :          * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
    1554              :          * database file by file.
    1555              :          */
    1556          399 :         if (dbstrategy == CREATEDB_WAL_LOG)
    1557          261 :             CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
    1558              :                                       dst_deftablespace);
    1559              :         else
    1560          138 :             CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
    1561              :                                         dst_deftablespace);
    1562              : 
    1563              :         /*
    1564              :          * Close pg_database, but keep lock till commit.
    1565              :          */
    1566          397 :         table_close(pg_database_rel, NoLock);
    1567              : 
    1568              :         /*
    1569              :          * Force synchronous commit, thus minimizing the window between
    1570              :          * creation of the database files and committal of the transaction. If
    1571              :          * we crash before committing, we'll have a DB that's taking up disk
    1572              :          * space but is not in pg_database, which is not good.
    1573              :          */
    1574          397 :         ForceSyncCommit();
    1575              :     }
    1576          399 :     PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1577              :                                 PointerGetDatum(&fparms));
    1578              : 
    1579          397 :     return dboid;
    1580              : }
    1581              : 
    1582              : /*
    1583              :  * Check whether chosen encoding matches chosen locale settings.  This
    1584              :  * restriction is necessary because libc's locale-specific code usually
    1585              :  * fails when presented with data in an encoding it's not expecting. We
    1586              :  * allow mismatch in four cases:
    1587              :  *
    1588              :  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
    1589              :  * which works with any encoding.
    1590              :  *
    1591              :  * 2. locale encoding = -1, which means that we couldn't determine the
    1592              :  * locale's encoding and have to trust the user to get it right.
    1593              :  *
    1594              :  * 3. selected encoding is UTF8 and platform is win32. This is because
    1595              :  * UTF8 is a pseudo codepage that is supported in all locales since it's
    1596              :  * converted to UTF16 before being used.
    1597              :  *
    1598              :  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
    1599              :  * is risky but we have historically allowed it --- notably, the
    1600              :  * regression tests require it.
    1601              :  *
    1602              :  * Note: if you change this policy, fix initdb to match.
    1603              :  */
    1604              : void
    1605          423 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
    1606              : {
    1607          423 :     int         ctype_encoding = pg_get_encoding_from_locale(ctype, true);
    1608          423 :     int         collate_encoding = pg_get_encoding_from_locale(collate, true);
    1609              : 
    1610          427 :     if (!(ctype_encoding == encoding ||
    1611            4 :           ctype_encoding == PG_SQL_ASCII ||
    1612              :           ctype_encoding == -1 ||
    1613              : #ifdef WIN32
    1614              :           encoding == PG_UTF8 ||
    1615              : #endif
    1616            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1617            0 :         ereport(ERROR,
    1618              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1619              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1620              :                         pg_encoding_to_char(encoding),
    1621              :                         ctype),
    1622              :                  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
    1623              :                            pg_encoding_to_char(ctype_encoding))));
    1624              : 
    1625          427 :     if (!(collate_encoding == encoding ||
    1626            4 :           collate_encoding == PG_SQL_ASCII ||
    1627              :           collate_encoding == -1 ||
    1628              : #ifdef WIN32
    1629              :           encoding == PG_UTF8 ||
    1630              : #endif
    1631            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1632            0 :         ereport(ERROR,
    1633              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1634              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1635              :                         pg_encoding_to_char(encoding),
    1636              :                         collate),
    1637              :                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
    1638              :                            pg_encoding_to_char(collate_encoding))));
    1639          423 : }
    1640              : 
    1641              : /* Error cleanup callback for createdb */
    1642              : static void
    1643            2 : createdb_failure_callback(int code, Datum arg)
    1644              : {
    1645            2 :     createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
    1646              : 
    1647              :     /*
    1648              :      * If we were copying database at block levels then drop pages for the
    1649              :      * destination database that are in the shared buffer cache.  And tell
    1650              :      * checkpointer to forget any pending fsync and unlink requests for files
    1651              :      * in the database.  The reasoning behind doing this is same as explained
    1652              :      * in dropdb function.  But unlike dropdb we don't need to call
    1653              :      * pgstat_drop_database because this database is still not created so
    1654              :      * there should not be any stat for this.
    1655              :      */
    1656            2 :     if (fparms->strategy == CREATEDB_WAL_LOG)
    1657              :     {
    1658            2 :         DropDatabaseBuffers(fparms->dest_dboid);
    1659            2 :         ForgetDatabaseSyncRequests(fparms->dest_dboid);
    1660              : 
    1661              :         /* Release lock on the target database. */
    1662            2 :         UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
    1663              :                            AccessShareLock);
    1664              :     }
    1665              : 
    1666              :     /*
    1667              :      * Release lock on source database before doing recursive remove. This is
    1668              :      * not essential but it seems desirable to release the lock as soon as
    1669              :      * possible.
    1670              :      */
    1671            2 :     UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
    1672              : 
    1673              :     /* Throw away any successfully copied subdirectories */
    1674            2 :     remove_dbtablespaces(fparms->dest_dboid);
    1675            2 : }
    1676              : 
    1677              : 
    1678              : /*
    1679              :  * DROP DATABASE
    1680              :  */
    1681              : void
    1682           66 : dropdb(const char *dbname, bool missing_ok, bool force)
    1683              : {
    1684              :     Oid         db_id;
    1685              :     bool        db_istemplate;
    1686              :     Relation    pgdbrel;
    1687              :     HeapTuple   tup;
    1688              :     ScanKeyData scankey;
    1689              :     void       *inplace_state;
    1690              :     Form_pg_database datform;
    1691              :     int         notherbackends;
    1692              :     int         npreparedxacts;
    1693              :     int         nslots,
    1694              :                 nslots_active;
    1695              :     int         nsubscriptions;
    1696              : 
    1697              :     /*
    1698              :      * Look up the target database's OID, and get exclusive lock on it. We
    1699              :      * need this to ensure that no new backend starts up in the target
    1700              :      * database while we are deleting it (see postinit.c), and that no one is
    1701              :      * using it as a CREATE DATABASE template or trying to delete it for
    1702              :      * themselves.
    1703              :      */
    1704           66 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1705              : 
    1706           66 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1707              :                      &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1708              :     {
    1709           16 :         if (!missing_ok)
    1710              :         {
    1711            8 :             ereport(ERROR,
    1712              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    1713              :                      errmsg("database \"%s\" does not exist", dbname)));
    1714              :         }
    1715              :         else
    1716              :         {
    1717              :             /* Close pg_database, release the lock, since we changed nothing */
    1718            8 :             table_close(pgdbrel, RowExclusiveLock);
    1719            8 :             ereport(NOTICE,
    1720              :                     (errmsg("database \"%s\" does not exist, skipping",
    1721              :                             dbname)));
    1722            8 :             return;
    1723              :         }
    1724              :     }
    1725              : 
    1726              :     /*
    1727              :      * Permission checks
    1728              :      */
    1729           50 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1730            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1731              :                        dbname);
    1732              : 
    1733              :     /* DROP hook for the database being removed */
    1734           50 :     InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
    1735              : 
    1736              :     /*
    1737              :      * Disallow dropping a DB that is marked istemplate.  This is just to
    1738              :      * prevent people from accidentally dropping template0 or template1; they
    1739              :      * can do so if they're really determined ...
    1740              :      */
    1741           50 :     if (db_istemplate)
    1742            0 :         ereport(ERROR,
    1743              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1744              :                  errmsg("cannot drop a template database")));
    1745              : 
    1746              :     /* Obviously can't drop my own database */
    1747           50 :     if (db_id == MyDatabaseId)
    1748            0 :         ereport(ERROR,
    1749              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1750              :                  errmsg("cannot drop the currently open database")));
    1751              : 
    1752              :     /*
    1753              :      * Check whether there are active logical slots that refer to the
    1754              :      * to-be-dropped database. The database lock we are holding prevents the
    1755              :      * creation of new slots using the database or existing slots becoming
    1756              :      * active.
    1757              :      */
    1758           50 :     (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
    1759           50 :     if (nslots_active)
    1760              :     {
    1761            1 :         ereport(ERROR,
    1762              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1763              :                  errmsg("database \"%s\" is used by an active logical replication slot",
    1764              :                         dbname),
    1765              :                  errdetail_plural("There is %d active slot.",
    1766              :                                   "There are %d active slots.",
    1767              :                                   nslots_active, nslots_active)));
    1768              :     }
    1769              : 
    1770              :     /*
    1771              :      * Check if there are subscriptions defined in the target database.
    1772              :      *
    1773              :      * We can't drop them automatically because they might be holding
    1774              :      * resources in other databases/instances.
    1775              :      */
    1776           49 :     if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
    1777            0 :         ereport(ERROR,
    1778              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1779              :                  errmsg("database \"%s\" is being used by logical replication subscription",
    1780              :                         dbname),
    1781              :                  errdetail_plural("There is %d subscription.",
    1782              :                                   "There are %d subscriptions.",
    1783              :                                   nsubscriptions, nsubscriptions)));
    1784              : 
    1785              : 
    1786              :     /*
    1787              :      * Attempt to terminate all existing connections to the target database if
    1788              :      * the user has requested to do so.
    1789              :      */
    1790           49 :     if (force)
    1791            1 :         TerminateOtherDBBackends(db_id);
    1792              : 
    1793              :     /*
    1794              :      * Check for other backends in the target database.  (Because we hold the
    1795              :      * database lock, no new ones can start after this.)
    1796              :      *
    1797              :      * As in CREATE DATABASE, check this after other error conditions.
    1798              :      */
    1799           49 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1800            0 :         ereport(ERROR,
    1801              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1802              :                  errmsg("database \"%s\" is being accessed by other users",
    1803              :                         dbname),
    1804              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1805              : 
    1806              :     /*
    1807              :      * Delete any comments or security labels associated with the database.
    1808              :      */
    1809           49 :     DeleteSharedComments(db_id, DatabaseRelationId);
    1810           49 :     DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
    1811              : 
    1812              :     /*
    1813              :      * Remove settings associated with this database
    1814              :      */
    1815           49 :     DropSetting(db_id, InvalidOid);
    1816              : 
    1817              :     /*
    1818              :      * Remove shared dependency references for the database.
    1819              :      */
    1820           49 :     dropDatabaseDependencies(db_id);
    1821              : 
    1822              :     /*
    1823              :      * Tell the cumulative stats system to forget it immediately, too.
    1824              :      */
    1825           49 :     pgstat_drop_database(db_id);
    1826              : 
    1827              :     /*
    1828              :      * Except for the deletion of the catalog row, subsequent actions are not
    1829              :      * transactional (consider DropDatabaseBuffers() discarding modified
    1830              :      * buffers). But we might crash or get interrupted below. To prevent
    1831              :      * accesses to a database with invalid contents, mark the database as
    1832              :      * invalid using an in-place update.
    1833              :      *
    1834              :      * We need to flush the WAL before continuing, to guarantee the
    1835              :      * modification is durable before performing irreversible filesystem
    1836              :      * operations.
    1837              :      */
    1838           49 :     ScanKeyInit(&scankey,
    1839              :                 Anum_pg_database_datname,
    1840              :                 BTEqualStrategyNumber, F_NAMEEQ,
    1841              :                 CStringGetDatum(dbname));
    1842           49 :     systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
    1843              :                                   NULL, 1, &scankey, &tup, &inplace_state);
    1844           49 :     if (!HeapTupleIsValid(tup))
    1845            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1846           49 :     datform = (Form_pg_database) GETSTRUCT(tup);
    1847           49 :     datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
    1848           49 :     systable_inplace_update_finish(inplace_state, tup);
    1849           49 :     XLogFlush(XactLastRecEnd);
    1850              : 
    1851              :     /*
    1852              :      * Also delete the tuple - transactionally. If this transaction commits,
    1853              :      * the row will be gone, but if we fail, dropdb() can be invoked again.
    1854              :      */
    1855           49 :     CatalogTupleDelete(pgdbrel, &tup->t_self);
    1856           49 :     heap_freetuple(tup);
    1857              : 
    1858              :     /*
    1859              :      * Drop db-specific replication slots.
    1860              :      */
    1861           49 :     ReplicationSlotsDropDBSlots(db_id);
    1862              : 
    1863              :     /*
    1864              :      * Drop pages for this database that are in the shared buffer cache. This
    1865              :      * is important to ensure that no remaining backend tries to write out a
    1866              :      * dirty buffer to the dead database later...
    1867              :      */
    1868           49 :     DropDatabaseBuffers(db_id);
    1869              : 
    1870              :     /*
    1871              :      * Tell checkpointer to forget any pending fsync and unlink requests for
    1872              :      * files in the database; else the fsyncs will fail at next checkpoint, or
    1873              :      * worse, it will delete files that belong to a newly created database
    1874              :      * with the same OID.
    1875              :      */
    1876           49 :     ForgetDatabaseSyncRequests(db_id);
    1877              : 
    1878              :     /*
    1879              :      * Force a checkpoint to make sure the checkpointer has received the
    1880              :      * message sent by ForgetDatabaseSyncRequests.
    1881              :      */
    1882           49 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    1883              : 
    1884              :     /* Close all smgr fds in all backends. */
    1885           49 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    1886              : 
    1887              :     /*
    1888              :      * Remove all tablespace subdirs belonging to the database.
    1889              :      */
    1890           49 :     remove_dbtablespaces(db_id);
    1891              : 
    1892              :     /*
    1893              :      * Close pg_database, but keep lock till commit.
    1894              :      */
    1895           48 :     table_close(pgdbrel, NoLock);
    1896              : 
    1897              :     /*
    1898              :      * Force synchronous commit, thus minimizing the window between removal of
    1899              :      * the database files and committal of the transaction. If we crash before
    1900              :      * committing, we'll have a DB that's gone on disk but still there
    1901              :      * according to pg_database, which is not good.
    1902              :      */
    1903           48 :     ForceSyncCommit();
    1904              : }
    1905              : 
    1906              : 
    1907              : /*
    1908              :  * Rename database
    1909              :  */
    1910              : ObjectAddress
    1911            7 : RenameDatabase(const char *oldname, const char *newname)
    1912              : {
    1913              :     Oid         db_id;
    1914              :     HeapTuple   newtup;
    1915              :     ItemPointerData otid;
    1916              :     Relation    rel;
    1917              :     int         notherbackends;
    1918              :     int         npreparedxacts;
    1919              :     ObjectAddress address;
    1920              : 
    1921              :     /* Report error if name has \n or \r character. */
    1922            7 :     if (strpbrk(newname, "\n\r"))
    1923            0 :         ereport(ERROR,
    1924              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1925              :                  errmsg("database name \"%s\" contains a newline or carriage return character", newname)));
    1926              : 
    1927              :     /*
    1928              :      * Look up the target database's OID, and get exclusive lock on it. We
    1929              :      * need this for the same reasons as DROP DATABASE.
    1930              :      */
    1931            7 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1932              : 
    1933            7 :     if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    1934              :                      NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1935            0 :         ereport(ERROR,
    1936              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1937              :                  errmsg("database \"%s\" does not exist", oldname)));
    1938              : 
    1939              :     /* must be owner */
    1940            7 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1941            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1942              :                        oldname);
    1943              : 
    1944              :     /* must have createdb rights */
    1945            7 :     if (!have_createdb_privilege())
    1946            0 :         ereport(ERROR,
    1947              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1948              :                  errmsg("permission denied to rename database")));
    1949              : 
    1950              :     /*
    1951              :      * If built with appropriate switch, whine when regression-testing
    1952              :      * conventions for database names are violated.
    1953              :      */
    1954              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1955              :     if (strstr(newname, "regression") == NULL)
    1956              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1957              : #endif
    1958              : 
    1959              :     /*
    1960              :      * Make sure the new name doesn't exist.  See notes for same error in
    1961              :      * CREATE DATABASE.
    1962              :      */
    1963            7 :     if (OidIsValid(get_database_oid(newname, true)))
    1964            0 :         ereport(ERROR,
    1965              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1966              :                  errmsg("database \"%s\" already exists", newname)));
    1967              : 
    1968              :     /*
    1969              :      * XXX Client applications probably store the current database somewhere,
    1970              :      * so renaming it could cause confusion.  On the other hand, there may not
    1971              :      * be an actual problem besides a little confusion, so think about this
    1972              :      * and decide.
    1973              :      */
    1974            7 :     if (db_id == MyDatabaseId)
    1975            0 :         ereport(ERROR,
    1976              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1977              :                  errmsg("current database cannot be renamed")));
    1978              : 
    1979              :     /*
    1980              :      * Make sure the database does not have active sessions.  This is the same
    1981              :      * concern as above, but applied to other sessions.
    1982              :      *
    1983              :      * As in CREATE DATABASE, check this after other error conditions.
    1984              :      */
    1985            7 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1986            0 :         ereport(ERROR,
    1987              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1988              :                  errmsg("database \"%s\" is being accessed by other users",
    1989              :                         oldname),
    1990              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1991              : 
    1992              :     /* rename */
    1993            7 :     newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    1994            7 :     if (!HeapTupleIsValid(newtup))
    1995            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1996            7 :     otid = newtup->t_self;
    1997            7 :     namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
    1998            7 :     CatalogTupleUpdate(rel, &otid, newtup);
    1999            7 :     UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
    2000              : 
    2001            7 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2002              : 
    2003            7 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2004              : 
    2005              :     /*
    2006              :      * Close pg_database, but keep lock till commit.
    2007              :      */
    2008            7 :     table_close(rel, NoLock);
    2009              : 
    2010            7 :     return address;
    2011              : }
    2012              : 
    2013              : 
    2014              : /*
    2015              :  * ALTER DATABASE SET TABLESPACE
    2016              :  */
    2017              : static void
    2018           12 : movedb(const char *dbname, const char *tblspcname)
    2019              : {
    2020              :     Oid         db_id;
    2021              :     Relation    pgdbrel;
    2022              :     int         notherbackends;
    2023              :     int         npreparedxacts;
    2024              :     HeapTuple   oldtuple,
    2025              :                 newtuple;
    2026              :     Oid         src_tblspcoid,
    2027              :                 dst_tblspcoid;
    2028              :     ScanKeyData scankey;
    2029              :     SysScanDesc sysscan;
    2030              :     AclResult   aclresult;
    2031              :     char       *src_dbpath;
    2032              :     char       *dst_dbpath;
    2033              :     DIR        *dstdir;
    2034              :     struct dirent *xlde;
    2035              :     movedb_failure_params fparms;
    2036              : 
    2037              :     /*
    2038              :      * Look up the target database's OID, and get exclusive lock on it. We
    2039              :      * need this to ensure that no new backend starts up in the database while
    2040              :      * we are moving it, and that no one is using it as a CREATE DATABASE
    2041              :      * template or trying to delete it.
    2042              :      */
    2043           12 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    2044              : 
    2045           12 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    2046              :                      NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
    2047            0 :         ereport(ERROR,
    2048              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2049              :                  errmsg("database \"%s\" does not exist", dbname)));
    2050              : 
    2051              :     /*
    2052              :      * We actually need a session lock, so that the lock will persist across
    2053              :      * the commit/restart below.  (We could almost get away with letting the
    2054              :      * lock be released at commit, except that someone could try to move
    2055              :      * relations of the DB back into the old directory while we rmtree() it.)
    2056              :      */
    2057           12 :     LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2058              :                                AccessExclusiveLock);
    2059              : 
    2060              :     /*
    2061              :      * Permission checks
    2062              :      */
    2063           12 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2064            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2065              :                        dbname);
    2066              : 
    2067              :     /*
    2068              :      * Obviously can't move the tables of my own database
    2069              :      */
    2070           12 :     if (db_id == MyDatabaseId)
    2071            0 :         ereport(ERROR,
    2072              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2073              :                  errmsg("cannot change the tablespace of the currently open database")));
    2074              : 
    2075              :     /*
    2076              :      * Get tablespace's oid
    2077              :      */
    2078           12 :     dst_tblspcoid = get_tablespace_oid(tblspcname, false);
    2079              : 
    2080              :     /*
    2081              :      * Permission checks
    2082              :      */
    2083           12 :     aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
    2084              :                                 ACL_CREATE);
    2085           12 :     if (aclresult != ACLCHECK_OK)
    2086            0 :         aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2087              :                        tblspcname);
    2088              : 
    2089              :     /*
    2090              :      * pg_global must never be the default tablespace
    2091              :      */
    2092           12 :     if (dst_tblspcoid == GLOBALTABLESPACE_OID)
    2093            0 :         ereport(ERROR,
    2094              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2095              :                  errmsg("pg_global cannot be used as default tablespace")));
    2096              : 
    2097              :     /*
    2098              :      * No-op if same tablespace
    2099              :      */
    2100           12 :     if (src_tblspcoid == dst_tblspcoid)
    2101              :     {
    2102            0 :         table_close(pgdbrel, NoLock);
    2103            0 :         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2104              :                                      AccessExclusiveLock);
    2105            0 :         return;
    2106              :     }
    2107              : 
    2108              :     /*
    2109              :      * Check for other backends in the target database.  (Because we hold the
    2110              :      * database lock, no new ones can start after this.)
    2111              :      *
    2112              :      * As in CREATE DATABASE, check this after other error conditions.
    2113              :      */
    2114           12 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    2115            0 :         ereport(ERROR,
    2116              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2117              :                  errmsg("database \"%s\" is being accessed by other users",
    2118              :                         dbname),
    2119              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    2120              : 
    2121              :     /*
    2122              :      * Get old and new database paths
    2123              :      */
    2124           12 :     src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
    2125           12 :     dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
    2126              : 
    2127              :     /*
    2128              :      * Force a checkpoint before proceeding. This will force all dirty
    2129              :      * buffers, including those of unlogged tables, out to disk, to ensure
    2130              :      * source database is up-to-date on disk for the copy.
    2131              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
    2132              :      * process any pending unlink requests. Otherwise, the check for existing
    2133              :      * files in the target directory might fail unnecessarily, not to mention
    2134              :      * that the copy might fail due to source files getting deleted under it.
    2135              :      * On Windows, this also ensures that background procs don't hold any open
    2136              :      * files, which would cause rmdir() to fail.
    2137              :      */
    2138           12 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
    2139              :                       | CHECKPOINT_FLUSH_UNLOGGED);
    2140              : 
    2141              :     /* Close all smgr fds in all backends. */
    2142           12 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    2143              : 
    2144              :     /*
    2145              :      * Now drop all buffers holding data of the target database; they should
    2146              :      * no longer be dirty so DropDatabaseBuffers is safe.
    2147              :      *
    2148              :      * It might seem that we could just let these buffers age out of shared
    2149              :      * buffers naturally, since they should not get referenced anymore.  The
    2150              :      * problem with that is that if the user later moves the database back to
    2151              :      * its original tablespace, any still-surviving buffers would appear to
    2152              :      * contain valid data again --- but they'd be missing any changes made in
    2153              :      * the database while it was in the new tablespace.  In any case, freeing
    2154              :      * buffers that should never be used again seems worth the cycles.
    2155              :      *
    2156              :      * Note: it'd be sufficient to get rid of buffers matching db_id and
    2157              :      * src_tblspcoid, but bufmgr.c presently provides no API for that.
    2158              :      */
    2159           12 :     DropDatabaseBuffers(db_id);
    2160              : 
    2161              :     /*
    2162              :      * Check for existence of files in the target directory, i.e., objects of
    2163              :      * this database that are already in the target tablespace.  We can't
    2164              :      * allow the move in such a case, because we would need to change those
    2165              :      * relations' pg_class.reltablespace entries to zero, and we don't have
    2166              :      * access to the DB's pg_class to do so.
    2167              :      */
    2168           12 :     dstdir = AllocateDir(dst_dbpath);
    2169           12 :     if (dstdir != NULL)
    2170              :     {
    2171            0 :         while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
    2172              :         {
    2173            0 :             if (strcmp(xlde->d_name, ".") == 0 ||
    2174            0 :                 strcmp(xlde->d_name, "..") == 0)
    2175            0 :                 continue;
    2176              : 
    2177            0 :             ereport(ERROR,
    2178              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2179              :                      errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
    2180              :                             dbname, tblspcname),
    2181              :                      errhint("You must move them back to the database's default tablespace before using this command.")));
    2182              :         }
    2183              : 
    2184            0 :         FreeDir(dstdir);
    2185              : 
    2186              :         /*
    2187              :          * The directory exists but is empty. We must remove it before using
    2188              :          * the copydir function.
    2189              :          */
    2190            0 :         if (rmdir(dst_dbpath) != 0)
    2191            0 :             elog(ERROR, "could not remove directory \"%s\": %m",
    2192              :                  dst_dbpath);
    2193              :     }
    2194              : 
    2195              :     /*
    2196              :      * Use an ENSURE block to make sure we remove the debris if the copy fails
    2197              :      * (eg, due to out-of-disk-space).  This is not a 100% solution, because
    2198              :      * of the possibility of failure during transaction commit, but it should
    2199              :      * handle most scenarios.
    2200              :      */
    2201           12 :     fparms.dest_dboid = db_id;
    2202           12 :     fparms.dest_tsoid = dst_tblspcoid;
    2203           12 :     PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2204              :                             PointerGetDatum(&fparms));
    2205              :     {
    2206           12 :         Datum       new_record[Natts_pg_database] = {0};
    2207           12 :         bool        new_record_nulls[Natts_pg_database] = {0};
    2208           12 :         bool        new_record_repl[Natts_pg_database] = {0};
    2209              : 
    2210              :         /*
    2211              :          * Copy files from the old tablespace to the new one
    2212              :          */
    2213           12 :         copydir(src_dbpath, dst_dbpath, false);
    2214              : 
    2215              :         /*
    2216              :          * Record the filesystem change in XLOG
    2217              :          */
    2218              :         {
    2219              :             xl_dbase_create_file_copy_rec xlrec;
    2220              : 
    2221           12 :             xlrec.db_id = db_id;
    2222           12 :             xlrec.tablespace_id = dst_tblspcoid;
    2223           12 :             xlrec.src_db_id = db_id;
    2224           12 :             xlrec.src_tablespace_id = src_tblspcoid;
    2225              : 
    2226           12 :             XLogBeginInsert();
    2227           12 :             XLogRegisterData(&xlrec,
    2228              :                              sizeof(xl_dbase_create_file_copy_rec));
    2229              : 
    2230           12 :             (void) XLogInsert(RM_DBASE_ID,
    2231              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
    2232              :         }
    2233              : 
    2234              :         /*
    2235              :          * Update the database's pg_database tuple
    2236              :          */
    2237           12 :         ScanKeyInit(&scankey,
    2238              :                     Anum_pg_database_datname,
    2239              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2240              :                     CStringGetDatum(dbname));
    2241           12 :         sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
    2242              :                                      NULL, 1, &scankey);
    2243           12 :         oldtuple = systable_getnext(sysscan);
    2244           12 :         if (!HeapTupleIsValid(oldtuple))    /* shouldn't happen... */
    2245            0 :             ereport(ERROR,
    2246              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    2247              :                      errmsg("database \"%s\" does not exist", dbname)));
    2248           12 :         LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2249              : 
    2250           12 :         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
    2251           12 :         new_record_repl[Anum_pg_database_dattablespace - 1] = true;
    2252              : 
    2253           12 :         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
    2254              :                                      new_record,
    2255              :                                      new_record_nulls, new_record_repl);
    2256           12 :         CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
    2257           12 :         UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2258              : 
    2259           12 :         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2260              : 
    2261           12 :         systable_endscan(sysscan);
    2262              : 
    2263              :         /*
    2264              :          * Force another checkpoint here.  As in CREATE DATABASE, this is to
    2265              :          * ensure that we don't have to replay a committed
    2266              :          * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
    2267              :          * any unlogged operations done in the new DB tablespace before the
    2268              :          * next checkpoint.
    2269              :          */
    2270           12 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    2271              : 
    2272              :         /*
    2273              :          * Force synchronous commit, thus minimizing the window between
    2274              :          * copying the database files and committal of the transaction. If we
    2275              :          * crash before committing, we'll leave an orphaned set of files on
    2276              :          * disk, which is not fatal but not good either.
    2277              :          */
    2278           12 :         ForceSyncCommit();
    2279              : 
    2280              :         /*
    2281              :          * Close pg_database, but keep lock till commit.
    2282              :          */
    2283           12 :         table_close(pgdbrel, NoLock);
    2284              :     }
    2285           12 :     PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2286              :                                 PointerGetDatum(&fparms));
    2287              : 
    2288              :     /*
    2289              :      * Commit the transaction so that the pg_database update is committed. If
    2290              :      * we crash while removing files, the database won't be corrupt, we'll
    2291              :      * just leave some orphaned files in the old directory.
    2292              :      *
    2293              :      * (This is OK because we know we aren't inside a transaction block.)
    2294              :      *
    2295              :      * XXX would it be safe/better to do this inside the ensure block?  Not
    2296              :      * convinced it's a good idea; consider elog just after the transaction
    2297              :      * really commits.
    2298              :      */
    2299           12 :     PopActiveSnapshot();
    2300           12 :     CommitTransactionCommand();
    2301              : 
    2302              :     /* Start new transaction for the remaining work; don't need a snapshot */
    2303           12 :     StartTransactionCommand();
    2304              : 
    2305              :     /*
    2306              :      * Remove files from the old tablespace
    2307              :      */
    2308           12 :     if (!rmtree(src_dbpath, true))
    2309            0 :         ereport(WARNING,
    2310              :                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2311              :                         src_dbpath)));
    2312              : 
    2313              :     /*
    2314              :      * Record the filesystem change in XLOG
    2315              :      */
    2316              :     {
    2317              :         xl_dbase_drop_rec xlrec;
    2318              : 
    2319           12 :         xlrec.db_id = db_id;
    2320           12 :         xlrec.ntablespaces = 1;
    2321              : 
    2322           12 :         XLogBeginInsert();
    2323           12 :         XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
    2324           12 :         XLogRegisterData(&src_tblspcoid, sizeof(Oid));
    2325              : 
    2326           12 :         (void) XLogInsert(RM_DBASE_ID,
    2327              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2328              :     }
    2329              : 
    2330              :     /* Now it's safe to release the database lock */
    2331           12 :     UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2332              :                                  AccessExclusiveLock);
    2333              : 
    2334           12 :     pfree(src_dbpath);
    2335           12 :     pfree(dst_dbpath);
    2336              : }
    2337              : 
    2338              : /* Error cleanup callback for movedb */
    2339              : static void
    2340            0 : movedb_failure_callback(int code, Datum arg)
    2341              : {
    2342            0 :     movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
    2343              :     char       *dstpath;
    2344              : 
    2345              :     /* Get rid of anything we managed to copy to the target directory */
    2346            0 :     dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
    2347              : 
    2348            0 :     (void) rmtree(dstpath, true);
    2349              : 
    2350            0 :     pfree(dstpath);
    2351            0 : }
    2352              : 
    2353              : /*
    2354              :  * Process options and call dropdb function.
    2355              :  */
    2356              : void
    2357           66 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
    2358              : {
    2359           66 :     bool        force = false;
    2360              :     ListCell   *lc;
    2361              : 
    2362           79 :     foreach(lc, stmt->options)
    2363              :     {
    2364           13 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2365              : 
    2366           13 :         if (strcmp(opt->defname, "force") == 0)
    2367           13 :             force = true;
    2368              :         else
    2369            0 :             ereport(ERROR,
    2370              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2371              :                      errmsg("unrecognized %s option \"%s\"",
    2372              :                             "DROP DATABASE", opt->defname),
    2373              :                      parser_errposition(pstate, opt->location)));
    2374              :     }
    2375              : 
    2376           66 :     dropdb(stmt->dbname, stmt->missing_ok, force);
    2377           56 : }
    2378              : 
    2379              : /*
    2380              :  * ALTER DATABASE name ...
    2381              :  */
    2382              : Oid
    2383           46 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    2384              : {
    2385              :     Relation    rel;
    2386              :     Oid         dboid;
    2387              :     HeapTuple   tuple,
    2388              :                 newtuple;
    2389              :     Form_pg_database datform;
    2390              :     ScanKeyData scankey;
    2391              :     SysScanDesc scan;
    2392              :     ListCell   *option;
    2393           46 :     bool        dbistemplate = false;
    2394           46 :     bool        dballowconnections = true;
    2395           46 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
    2396           46 :     DefElem    *distemplate = NULL;
    2397           46 :     DefElem    *dallowconnections = NULL;
    2398           46 :     DefElem    *dconnlimit = NULL;
    2399           46 :     DefElem    *dtablespace = NULL;
    2400           46 :     Datum       new_record[Natts_pg_database] = {0};
    2401           46 :     bool        new_record_nulls[Natts_pg_database] = {0};
    2402           46 :     bool        new_record_repl[Natts_pg_database] = {0};
    2403              : 
    2404              :     /* Extract options from the statement node tree */
    2405           92 :     foreach(option, stmt->options)
    2406              :     {
    2407           46 :         DefElem    *defel = (DefElem *) lfirst(option);
    2408              : 
    2409           46 :         if (strcmp(defel->defname, "is_template") == 0)
    2410              :         {
    2411           11 :             if (distemplate)
    2412            0 :                 errorConflictingDefElem(defel, pstate);
    2413           11 :             distemplate = defel;
    2414              :         }
    2415           35 :         else if (strcmp(defel->defname, "allow_connections") == 0)
    2416              :         {
    2417           19 :             if (dallowconnections)
    2418            0 :                 errorConflictingDefElem(defel, pstate);
    2419           19 :             dallowconnections = defel;
    2420              :         }
    2421           16 :         else if (strcmp(defel->defname, "connection_limit") == 0)
    2422              :         {
    2423            4 :             if (dconnlimit)
    2424            0 :                 errorConflictingDefElem(defel, pstate);
    2425            4 :             dconnlimit = defel;
    2426              :         }
    2427           12 :         else if (strcmp(defel->defname, "tablespace") == 0)
    2428              :         {
    2429           12 :             if (dtablespace)
    2430            0 :                 errorConflictingDefElem(defel, pstate);
    2431           12 :             dtablespace = defel;
    2432              :         }
    2433              :         else
    2434            0 :             ereport(ERROR,
    2435              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2436              :                      errmsg("option \"%s\" not recognized", defel->defname),
    2437              :                      parser_errposition(pstate, defel->location)));
    2438              :     }
    2439              : 
    2440           46 :     if (dtablespace)
    2441              :     {
    2442              :         /*
    2443              :          * While the SET TABLESPACE syntax doesn't allow any other options,
    2444              :          * somebody could write "WITH TABLESPACE ...".  Forbid any other
    2445              :          * options from being specified in that case.
    2446              :          */
    2447           12 :         if (list_length(stmt->options) != 1)
    2448            0 :             ereport(ERROR,
    2449              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2450              :                      errmsg("option \"%s\" cannot be specified with other options",
    2451              :                             dtablespace->defname),
    2452              :                      parser_errposition(pstate, dtablespace->location)));
    2453              :         /* this case isn't allowed within a transaction block */
    2454           12 :         PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
    2455           12 :         movedb(stmt->dbname, defGetString(dtablespace));
    2456           12 :         return InvalidOid;
    2457              :     }
    2458              : 
    2459           34 :     if (distemplate && distemplate->arg)
    2460           11 :         dbistemplate = defGetBoolean(distemplate);
    2461           34 :     if (dallowconnections && dallowconnections->arg)
    2462           19 :         dballowconnections = defGetBoolean(dallowconnections);
    2463           34 :     if (dconnlimit && dconnlimit->arg)
    2464              :     {
    2465            4 :         dbconnlimit = defGetInt32(dconnlimit);
    2466            4 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
    2467            0 :             ereport(ERROR,
    2468              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2469              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
    2470              :     }
    2471              : 
    2472              :     /*
    2473              :      * Get the old tuple.  We don't need a lock on the database per se,
    2474              :      * because we're not going to do anything that would mess up incoming
    2475              :      * connections.
    2476              :      */
    2477           34 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2478           34 :     ScanKeyInit(&scankey,
    2479              :                 Anum_pg_database_datname,
    2480              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2481           34 :                 CStringGetDatum(stmt->dbname));
    2482           34 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2483              :                               NULL, 1, &scankey);
    2484           34 :     tuple = systable_getnext(scan);
    2485           34 :     if (!HeapTupleIsValid(tuple))
    2486            0 :         ereport(ERROR,
    2487              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2488              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2489           34 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2490              : 
    2491           34 :     datform = (Form_pg_database) GETSTRUCT(tuple);
    2492           34 :     dboid = datform->oid;
    2493              : 
    2494           34 :     if (database_is_invalid_form(datform))
    2495              :     {
    2496            1 :         ereport(FATAL,
    2497              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2498              :                 errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
    2499              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    2500              :     }
    2501              : 
    2502           33 :     if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
    2503            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2504            0 :                        stmt->dbname);
    2505              : 
    2506              :     /*
    2507              :      * In order to avoid getting locked out and having to go through
    2508              :      * standalone mode, we refuse to disallow connections to the database
    2509              :      * we're currently connected to.  Lockout can still happen with concurrent
    2510              :      * sessions but the likeliness of that is not high enough to worry about.
    2511              :      */
    2512           33 :     if (!dballowconnections && dboid == MyDatabaseId)
    2513            0 :         ereport(ERROR,
    2514              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2515              :                  errmsg("cannot disallow connections for current database")));
    2516              : 
    2517              :     /*
    2518              :      * Build an updated tuple, perusing the information just obtained
    2519              :      */
    2520           33 :     if (distemplate)
    2521              :     {
    2522           11 :         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    2523           11 :         new_record_repl[Anum_pg_database_datistemplate - 1] = true;
    2524              :     }
    2525           33 :     if (dallowconnections)
    2526              :     {
    2527           19 :         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    2528           19 :         new_record_repl[Anum_pg_database_datallowconn - 1] = true;
    2529              :     }
    2530           33 :     if (dconnlimit)
    2531              :     {
    2532            3 :         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    2533            3 :         new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    2534              :     }
    2535              : 
    2536           33 :     newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
    2537              :                                  new_record_nulls, new_record_repl);
    2538           33 :     CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2539           33 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2540              : 
    2541           33 :     InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
    2542              : 
    2543           33 :     systable_endscan(scan);
    2544              : 
    2545              :     /* Close pg_database, but keep lock till commit */
    2546           33 :     table_close(rel, NoLock);
    2547              : 
    2548           33 :     return dboid;
    2549              : }
    2550              : 
    2551              : 
    2552              : /*
    2553              :  * ALTER DATABASE name REFRESH COLLATION VERSION
    2554              :  */
    2555              : ObjectAddress
    2556            3 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    2557              : {
    2558              :     Relation    rel;
    2559              :     ScanKeyData scankey;
    2560              :     SysScanDesc scan;
    2561              :     Oid         db_id;
    2562              :     HeapTuple   tuple;
    2563              :     Form_pg_database datForm;
    2564              :     ObjectAddress address;
    2565              :     Datum       datum;
    2566              :     bool        isnull;
    2567              :     char       *oldversion;
    2568              :     char       *newversion;
    2569              : 
    2570            3 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2571            3 :     ScanKeyInit(&scankey,
    2572              :                 Anum_pg_database_datname,
    2573              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2574            3 :                 CStringGetDatum(stmt->dbname));
    2575            3 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2576              :                               NULL, 1, &scankey);
    2577            3 :     tuple = systable_getnext(scan);
    2578            3 :     if (!HeapTupleIsValid(tuple))
    2579            0 :         ereport(ERROR,
    2580              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2581              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2582              : 
    2583            3 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2584            3 :     db_id = datForm->oid;
    2585              : 
    2586            3 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2587            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2588            0 :                        stmt->dbname);
    2589            3 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2590              : 
    2591            3 :     datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    2592            3 :     oldversion = isnull ? NULL : TextDatumGetCString(datum);
    2593              : 
    2594            3 :     if (datForm->datlocprovider == COLLPROVIDER_LIBC)
    2595              :     {
    2596            2 :         datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
    2597            2 :         if (isnull)
    2598            0 :             elog(ERROR, "unexpected null in pg_database");
    2599              :     }
    2600              :     else
    2601              :     {
    2602            1 :         datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
    2603            1 :         if (isnull)
    2604            0 :             elog(ERROR, "unexpected null in pg_database");
    2605              :     }
    2606              : 
    2607            3 :     newversion = get_collation_actual_version(datForm->datlocprovider,
    2608            3 :                                               TextDatumGetCString(datum));
    2609              : 
    2610              :     /* cannot change from NULL to non-NULL or vice versa */
    2611            3 :     if ((!oldversion && newversion) || (oldversion && !newversion))
    2612            0 :         elog(ERROR, "invalid collation version change");
    2613            3 :     else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
    2614            0 :     {
    2615            0 :         bool        nulls[Natts_pg_database] = {0};
    2616            0 :         bool        replaces[Natts_pg_database] = {0};
    2617            0 :         Datum       values[Natts_pg_database] = {0};
    2618              :         HeapTuple   newtuple;
    2619              : 
    2620            0 :         ereport(NOTICE,
    2621              :                 (errmsg("changing version from %s to %s",
    2622              :                         oldversion, newversion)));
    2623              : 
    2624            0 :         values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
    2625            0 :         replaces[Anum_pg_database_datcollversion - 1] = true;
    2626              : 
    2627            0 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
    2628              :                                      values, nulls, replaces);
    2629            0 :         CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2630            0 :         heap_freetuple(newtuple);
    2631              :     }
    2632              :     else
    2633            3 :         ereport(NOTICE,
    2634              :                 (errmsg("version has not changed")));
    2635            3 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2636              : 
    2637            3 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2638              : 
    2639            3 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2640              : 
    2641            3 :     systable_endscan(scan);
    2642              : 
    2643            3 :     table_close(rel, NoLock);
    2644              : 
    2645            3 :     return address;
    2646              : }
    2647              : 
    2648              : 
    2649              : /*
    2650              :  * ALTER DATABASE name SET ...
    2651              :  */
    2652              : Oid
    2653          639 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
    2654              : {
    2655          639 :     Oid         datid = get_database_oid(stmt->dbname, false);
    2656              : 
    2657              :     /*
    2658              :      * Obtain a lock on the database and make sure it didn't go away in the
    2659              :      * meantime.
    2660              :      */
    2661          639 :     shdepLockAndCheckObject(DatabaseRelationId, datid);
    2662              : 
    2663          639 :     if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
    2664            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2665            0 :                        stmt->dbname);
    2666              : 
    2667          639 :     AlterSetting(datid, InvalidOid, stmt->setstmt);
    2668              : 
    2669          637 :     UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
    2670              : 
    2671          637 :     return datid;
    2672              : }
    2673              : 
    2674              : 
    2675              : /*
    2676              :  * ALTER DATABASE name OWNER TO newowner
    2677              :  */
    2678              : ObjectAddress
    2679           45 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
    2680              : {
    2681              :     Oid         db_id;
    2682              :     HeapTuple   tuple;
    2683              :     Relation    rel;
    2684              :     ScanKeyData scankey;
    2685              :     SysScanDesc scan;
    2686              :     Form_pg_database datForm;
    2687              :     ObjectAddress address;
    2688              : 
    2689              :     /*
    2690              :      * Get the old tuple.  We don't need a lock on the database per se,
    2691              :      * because we're not going to do anything that would mess up incoming
    2692              :      * connections.
    2693              :      */
    2694           45 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2695           45 :     ScanKeyInit(&scankey,
    2696              :                 Anum_pg_database_datname,
    2697              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2698              :                 CStringGetDatum(dbname));
    2699           45 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2700              :                               NULL, 1, &scankey);
    2701           45 :     tuple = systable_getnext(scan);
    2702           45 :     if (!HeapTupleIsValid(tuple))
    2703            0 :         ereport(ERROR,
    2704              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2705              :                  errmsg("database \"%s\" does not exist", dbname)));
    2706              : 
    2707           45 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2708           45 :     db_id = datForm->oid;
    2709              : 
    2710              :     /*
    2711              :      * If the new owner is the same as the existing owner, consider the
    2712              :      * command to have succeeded.  This is to be consistent with other
    2713              :      * objects.
    2714              :      */
    2715           45 :     if (datForm->datdba != newOwnerId)
    2716              :     {
    2717              :         Datum       repl_val[Natts_pg_database];
    2718           15 :         bool        repl_null[Natts_pg_database] = {0};
    2719           15 :         bool        repl_repl[Natts_pg_database] = {0};
    2720              :         Acl        *newAcl;
    2721              :         Datum       aclDatum;
    2722              :         bool        isNull;
    2723              :         HeapTuple   newtuple;
    2724              : 
    2725              :         /* Otherwise, must be owner of the existing object */
    2726           15 :         if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2727            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2728              :                            dbname);
    2729              : 
    2730              :         /* Must be able to become new owner */
    2731           15 :         check_can_set_role(GetUserId(), newOwnerId);
    2732              : 
    2733              :         /*
    2734              :          * must have createdb rights
    2735              :          *
    2736              :          * NOTE: This is different from other alter-owner checks in that the
    2737              :          * current user is checked for createdb privileges instead of the
    2738              :          * destination owner.  This is consistent with the CREATE case for
    2739              :          * databases.  Because superusers will always have this right, we need
    2740              :          * no special case for them.
    2741              :          */
    2742           15 :         if (!have_createdb_privilege())
    2743            0 :             ereport(ERROR,
    2744              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2745              :                      errmsg("permission denied to change owner of database")));
    2746              : 
    2747           15 :         LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2748              : 
    2749           15 :         repl_repl[Anum_pg_database_datdba - 1] = true;
    2750           15 :         repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
    2751              : 
    2752              :         /*
    2753              :          * Determine the modified ACL for the new owner.  This is only
    2754              :          * necessary when the ACL is non-null.
    2755              :          */
    2756           15 :         aclDatum = heap_getattr(tuple,
    2757              :                                 Anum_pg_database_datacl,
    2758              :                                 RelationGetDescr(rel),
    2759              :                                 &isNull);
    2760           15 :         if (!isNull)
    2761              :         {
    2762            0 :             newAcl = aclnewowner(DatumGetAclP(aclDatum),
    2763              :                                  datForm->datdba, newOwnerId);
    2764            0 :             repl_repl[Anum_pg_database_datacl - 1] = true;
    2765            0 :             repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
    2766              :         }
    2767              : 
    2768           15 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
    2769           15 :         CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
    2770           15 :         UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2771              : 
    2772           15 :         heap_freetuple(newtuple);
    2773              : 
    2774              :         /* Update owner dependency reference */
    2775           15 :         changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
    2776              :     }
    2777              : 
    2778           45 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2779              : 
    2780           45 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2781              : 
    2782           45 :     systable_endscan(scan);
    2783              : 
    2784              :     /* Close pg_database, but keep lock till commit */
    2785           45 :     table_close(rel, NoLock);
    2786              : 
    2787           45 :     return address;
    2788              : }
    2789              : 
    2790              : 
    2791              : Datum
    2792           49 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
    2793              : {
    2794           49 :     Oid         dbid = PG_GETARG_OID(0);
    2795              :     HeapTuple   tp;
    2796              :     char        datlocprovider;
    2797              :     Datum       datum;
    2798              :     char       *version;
    2799              : 
    2800           49 :     tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    2801           49 :     if (!HeapTupleIsValid(tp))
    2802            0 :         ereport(ERROR,
    2803              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2804              :                  errmsg("database with OID %u does not exist", dbid)));
    2805              : 
    2806           49 :     datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
    2807              : 
    2808           49 :     if (datlocprovider == COLLPROVIDER_LIBC)
    2809           42 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
    2810              :     else
    2811            7 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
    2812              : 
    2813           49 :     version = get_collation_actual_version(datlocprovider,
    2814           49 :                                            TextDatumGetCString(datum));
    2815              : 
    2816           49 :     ReleaseSysCache(tp);
    2817              : 
    2818           49 :     if (version)
    2819           35 :         PG_RETURN_TEXT_P(cstring_to_text(version));
    2820              :     else
    2821           14 :         PG_RETURN_NULL();
    2822              : }
    2823              : 
    2824              : 
    2825              : /*
    2826              :  * Helper functions
    2827              :  */
    2828              : 
    2829              : /*
    2830              :  * Look up info about the database named "name".  If the database exists,
    2831              :  * obtain the specified lock type on it, fill in any of the remaining
    2832              :  * parameters that aren't NULL, and return true.  If no such database,
    2833              :  * return false.
    2834              :  */
    2835              : static bool
    2836          498 : get_db_info(const char *name, LOCKMODE lockmode,
    2837              :             Oid *dbIdP, Oid *ownerIdP,
    2838              :             int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
    2839              :             TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
    2840              :             Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
    2841              :             char **dbIcurules,
    2842              :             char *dbLocProvider,
    2843              :             char **dbCollversion)
    2844              : {
    2845          498 :     bool        result = false;
    2846              :     Relation    relation;
    2847              : 
    2848              :     Assert(name);
    2849              : 
    2850              :     /* Caller may wish to grab a better lock on pg_database beforehand... */
    2851          498 :     relation = table_open(DatabaseRelationId, AccessShareLock);
    2852              : 
    2853              :     /*
    2854              :      * Loop covers the rare case where the database is renamed before we can
    2855              :      * lock it.  We try again just in case we can find a new one of the same
    2856              :      * name.
    2857              :      */
    2858              :     for (;;)
    2859            0 :     {
    2860              :         ScanKeyData scanKey;
    2861              :         SysScanDesc scan;
    2862              :         HeapTuple   tuple;
    2863              :         Oid         dbOid;
    2864              : 
    2865              :         /*
    2866              :          * there's no syscache for database-indexed-by-name, so must do it the
    2867              :          * hard way
    2868              :          */
    2869          498 :         ScanKeyInit(&scanKey,
    2870              :                     Anum_pg_database_datname,
    2871              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2872              :                     CStringGetDatum(name));
    2873              : 
    2874          498 :         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
    2875              :                                   NULL, 1, &scanKey);
    2876              : 
    2877          498 :         tuple = systable_getnext(scan);
    2878              : 
    2879          498 :         if (!HeapTupleIsValid(tuple))
    2880              :         {
    2881              :             /* definitely no database of that name */
    2882           16 :             systable_endscan(scan);
    2883           16 :             break;
    2884              :         }
    2885              : 
    2886          482 :         dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
    2887              : 
    2888          482 :         systable_endscan(scan);
    2889              : 
    2890              :         /*
    2891              :          * Now that we have a database OID, we can try to lock the DB.
    2892              :          */
    2893          482 :         if (lockmode != NoLock)
    2894          482 :             LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2895              : 
    2896              :         /*
    2897              :          * And now, re-fetch the tuple by OID.  If it's still there and still
    2898              :          * the same name, we win; else, drop the lock and loop back to try
    2899              :          * again.
    2900              :          */
    2901          482 :         tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
    2902          482 :         if (HeapTupleIsValid(tuple))
    2903              :         {
    2904          482 :             Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
    2905              : 
    2906          482 :             if (strcmp(name, NameStr(dbform->datname)) == 0)
    2907              :             {
    2908              :                 Datum       datum;
    2909              :                 bool        isnull;
    2910              : 
    2911              :                 /* oid of the database */
    2912          482 :                 if (dbIdP)
    2913          482 :                     *dbIdP = dbOid;
    2914              :                 /* oid of the owner */
    2915          482 :                 if (ownerIdP)
    2916          413 :                     *ownerIdP = dbform->datdba;
    2917              :                 /* character encoding */
    2918          482 :                 if (encodingP)
    2919          413 :                     *encodingP = dbform->encoding;
    2920              :                 /* allowed as template? */
    2921          482 :                 if (dbIsTemplateP)
    2922          463 :                     *dbIsTemplateP = dbform->datistemplate;
    2923              :                 /* Has on login event trigger? */
    2924          482 :                 if (dbHasLoginEvtP)
    2925          413 :                     *dbHasLoginEvtP = dbform->dathasloginevt;
    2926              :                 /* allowing connections? */
    2927          482 :                 if (dbAllowConnP)
    2928          413 :                     *dbAllowConnP = dbform->datallowconn;
    2929              :                 /* limit of frozen XIDs */
    2930          482 :                 if (dbFrozenXidP)
    2931          413 :                     *dbFrozenXidP = dbform->datfrozenxid;
    2932              :                 /* minimum MultiXactId */
    2933          482 :                 if (dbMinMultiP)
    2934          413 :                     *dbMinMultiP = dbform->datminmxid;
    2935              :                 /* default tablespace for this database */
    2936          482 :                 if (dbTablespace)
    2937          425 :                     *dbTablespace = dbform->dattablespace;
    2938              :                 /* default locale settings for this database */
    2939          482 :                 if (dbLocProvider)
    2940          413 :                     *dbLocProvider = dbform->datlocprovider;
    2941          482 :                 if (dbCollate)
    2942              :                 {
    2943          413 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
    2944          413 :                     *dbCollate = TextDatumGetCString(datum);
    2945              :                 }
    2946          482 :                 if (dbCtype)
    2947              :                 {
    2948          413 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
    2949          413 :                     *dbCtype = TextDatumGetCString(datum);
    2950              :                 }
    2951          482 :                 if (dbLocale)
    2952              :                 {
    2953          413 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
    2954          413 :                     if (isnull)
    2955          384 :                         *dbLocale = NULL;
    2956              :                     else
    2957           29 :                         *dbLocale = TextDatumGetCString(datum);
    2958              :                 }
    2959          482 :                 if (dbIcurules)
    2960              :                 {
    2961          413 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
    2962          413 :                     if (isnull)
    2963          413 :                         *dbIcurules = NULL;
    2964              :                     else
    2965            0 :                         *dbIcurules = TextDatumGetCString(datum);
    2966              :                 }
    2967          482 :                 if (dbCollversion)
    2968              :                 {
    2969          413 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
    2970          413 :                     if (isnull)
    2971          245 :                         *dbCollversion = NULL;
    2972              :                     else
    2973          168 :                         *dbCollversion = TextDatumGetCString(datum);
    2974              :                 }
    2975          482 :                 ReleaseSysCache(tuple);
    2976          482 :                 result = true;
    2977          482 :                 break;
    2978              :             }
    2979              :             /* can only get here if it was just renamed */
    2980            0 :             ReleaseSysCache(tuple);
    2981              :         }
    2982              : 
    2983            0 :         if (lockmode != NoLock)
    2984            0 :             UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2985              :     }
    2986              : 
    2987          498 :     table_close(relation, AccessShareLock);
    2988              : 
    2989          498 :     return result;
    2990              : }
    2991              : 
    2992              : /* Check if current user has createdb privileges */
    2993              : bool
    2994          453 : have_createdb_privilege(void)
    2995              : {
    2996          453 :     bool        result = false;
    2997              :     HeapTuple   utup;
    2998              : 
    2999              :     /* Superusers can always do everything */
    3000          453 :     if (superuser())
    3001          435 :         return true;
    3002              : 
    3003           18 :     utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
    3004           18 :     if (HeapTupleIsValid(utup))
    3005              :     {
    3006           18 :         result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
    3007           18 :         ReleaseSysCache(utup);
    3008              :     }
    3009           18 :     return result;
    3010              : }
    3011              : 
    3012              : /*
    3013              :  * Remove tablespace directories
    3014              :  *
    3015              :  * We don't know what tablespaces db_id is using, so iterate through all
    3016              :  * tablespaces removing <tablespace>/db_id
    3017              :  */
    3018              : static void
    3019           51 : remove_dbtablespaces(Oid db_id)
    3020              : {
    3021              :     Relation    rel;
    3022              :     TableScanDesc scan;
    3023              :     HeapTuple   tuple;
    3024           51 :     List       *ltblspc = NIL;
    3025              :     ListCell   *cell;
    3026              :     int         ntblspc;
    3027              :     int         i;
    3028              :     Oid        *tablespace_ids;
    3029              : 
    3030           51 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3031           50 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3032          184 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3033              :     {
    3034          134 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3035          134 :         Oid         dsttablespace = spcform->oid;
    3036              :         char       *dstpath;
    3037              :         struct stat st;
    3038              : 
    3039              :         /* Don't mess with the global tablespace */
    3040          134 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3041           84 :             continue;
    3042              : 
    3043           84 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3044              : 
    3045           84 :         if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
    3046              :         {
    3047              :             /* Assume we can ignore it */
    3048           34 :             pfree(dstpath);
    3049           34 :             continue;
    3050              :         }
    3051              : 
    3052           50 :         if (!rmtree(dstpath, true))
    3053            0 :             ereport(WARNING,
    3054              :                     (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3055              :                             dstpath)));
    3056              : 
    3057           50 :         ltblspc = lappend_oid(ltblspc, dsttablespace);
    3058           50 :         pfree(dstpath);
    3059              :     }
    3060              : 
    3061           50 :     ntblspc = list_length(ltblspc);
    3062           50 :     if (ntblspc == 0)
    3063              :     {
    3064            0 :         table_endscan(scan);
    3065            0 :         table_close(rel, AccessShareLock);
    3066            0 :         return;
    3067              :     }
    3068              : 
    3069           50 :     tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
    3070           50 :     i = 0;
    3071          100 :     foreach(cell, ltblspc)
    3072           50 :         tablespace_ids[i++] = lfirst_oid(cell);
    3073              : 
    3074              :     /* Record the filesystem change in XLOG */
    3075              :     {
    3076              :         xl_dbase_drop_rec xlrec;
    3077              : 
    3078           50 :         xlrec.db_id = db_id;
    3079           50 :         xlrec.ntablespaces = ntblspc;
    3080              : 
    3081           50 :         XLogBeginInsert();
    3082           50 :         XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
    3083           50 :         XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
    3084              : 
    3085           50 :         (void) XLogInsert(RM_DBASE_ID,
    3086              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    3087              :     }
    3088              : 
    3089           50 :     list_free(ltblspc);
    3090           50 :     pfree(tablespace_ids);
    3091              : 
    3092           50 :     table_endscan(scan);
    3093           50 :     table_close(rel, AccessShareLock);
    3094              : }
    3095              : 
    3096              : /*
    3097              :  * Check for existing files that conflict with a proposed new DB OID;
    3098              :  * return true if there are any
    3099              :  *
    3100              :  * If there were a subdirectory in any tablespace matching the proposed new
    3101              :  * OID, we'd get a create failure due to the duplicate name ... and then we'd
    3102              :  * try to remove that already-existing subdirectory during the cleanup in
    3103              :  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
    3104              :  * instead we make this extra check before settling on the OID of the new
    3105              :  * database.  This exactly parallels what GetNewRelFileNumber() does for table
    3106              :  * relfilenumber values.
    3107              :  */
    3108              : static bool
    3109          399 : check_db_file_conflict(Oid db_id)
    3110              : {
    3111          399 :     bool        result = false;
    3112              :     Relation    rel;
    3113              :     TableScanDesc scan;
    3114              :     HeapTuple   tuple;
    3115              : 
    3116          399 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3117          399 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3118         1269 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3119              :     {
    3120          870 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3121          870 :         Oid         dsttablespace = spcform->oid;
    3122              :         char       *dstpath;
    3123              :         struct stat st;
    3124              : 
    3125              :         /* Don't mess with the global tablespace */
    3126          870 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3127          399 :             continue;
    3128              : 
    3129          471 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3130              : 
    3131          471 :         if (lstat(dstpath, &st) == 0)
    3132              :         {
    3133              :             /* Found a conflicting file (or directory, whatever) */
    3134            0 :             pfree(dstpath);
    3135            0 :             result = true;
    3136            0 :             break;
    3137              :         }
    3138              : 
    3139          471 :         pfree(dstpath);
    3140              :     }
    3141              : 
    3142          399 :     table_endscan(scan);
    3143          399 :     table_close(rel, AccessShareLock);
    3144              : 
    3145          399 :     return result;
    3146              : }
    3147              : 
    3148              : /*
    3149              :  * Issue a suitable errdetail message for a busy database
    3150              :  */
    3151              : static int
    3152            1 : errdetail_busy_db(int notherbackends, int npreparedxacts)
    3153              : {
    3154            1 :     if (notherbackends > 0 && npreparedxacts > 0)
    3155              : 
    3156              :         /*
    3157              :          * We don't deal with singular versus plural here, since gettext
    3158              :          * doesn't support multiple plurals in one string.
    3159              :          */
    3160            0 :         errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
    3161              :                   notherbackends, npreparedxacts);
    3162            1 :     else if (notherbackends > 0)
    3163            1 :         errdetail_plural("There is %d other session using the database.",
    3164              :                          "There are %d other sessions using the database.",
    3165              :                          notherbackends,
    3166              :                          notherbackends);
    3167              :     else
    3168            0 :         errdetail_plural("There is %d prepared transaction using the database.",
    3169              :                          "There are %d prepared transactions using the database.",
    3170              :                          npreparedxacts,
    3171              :                          npreparedxacts);
    3172            1 :     return 0;                   /* just to keep ereport macro happy */
    3173              : }
    3174              : 
    3175              : /*
    3176              :  * get_database_oid - given a database name, look up the OID
    3177              :  *
    3178              :  * If missing_ok is false, throw an error if database name not found.  If
    3179              :  * true, just return InvalidOid.
    3180              :  */
    3181              : Oid
    3182         1575 : get_database_oid(const char *dbname, bool missing_ok)
    3183              : {
    3184              :     Relation    pg_database;
    3185              :     ScanKeyData entry[1];
    3186              :     SysScanDesc scan;
    3187              :     HeapTuple   dbtuple;
    3188              :     Oid         oid;
    3189              : 
    3190              :     /*
    3191              :      * There's no syscache for pg_database indexed by name, so we must look
    3192              :      * the hard way.
    3193              :      */
    3194         1575 :     pg_database = table_open(DatabaseRelationId, AccessShareLock);
    3195         1575 :     ScanKeyInit(&entry[0],
    3196              :                 Anum_pg_database_datname,
    3197              :                 BTEqualStrategyNumber, F_NAMEEQ,
    3198              :                 CStringGetDatum(dbname));
    3199         1575 :     scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
    3200              :                               NULL, 1, entry);
    3201              : 
    3202         1575 :     dbtuple = systable_getnext(scan);
    3203              : 
    3204              :     /* We assume that there can be at most one matching tuple */
    3205         1575 :     if (HeapTupleIsValid(dbtuple))
    3206         1150 :         oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
    3207              :     else
    3208          425 :         oid = InvalidOid;
    3209              : 
    3210         1575 :     systable_endscan(scan);
    3211         1575 :     table_close(pg_database, AccessShareLock);
    3212              : 
    3213         1575 :     if (!OidIsValid(oid) && !missing_ok)
    3214            3 :         ereport(ERROR,
    3215              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    3216              :                  errmsg("database \"%s\" does not exist",
    3217              :                         dbname)));
    3218              : 
    3219         1572 :     return oid;
    3220              : }
    3221              : 
    3222              : 
    3223              : /*
    3224              :  * While dropping a database the pg_database row is marked invalid, but the
    3225              :  * catalog contents still exist. Connections to such a database are not
    3226              :  * allowed.
    3227              :  */
    3228              : bool
    3229        28276 : database_is_invalid_form(Form_pg_database datform)
    3230              : {
    3231        28276 :     return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
    3232              : }
    3233              : 
    3234              : 
    3235              : /*
    3236              :  * Convenience wrapper around database_is_invalid_form()
    3237              :  */
    3238              : bool
    3239          413 : database_is_invalid_oid(Oid dboid)
    3240              : {
    3241              :     HeapTuple   dbtup;
    3242              :     Form_pg_database dbform;
    3243              :     bool        invalid;
    3244              : 
    3245          413 :     dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
    3246          413 :     if (!HeapTupleIsValid(dbtup))
    3247            0 :         elog(ERROR, "cache lookup failed for database %u", dboid);
    3248          413 :     dbform = (Form_pg_database) GETSTRUCT(dbtup);
    3249              : 
    3250          413 :     invalid = database_is_invalid_form(dbform);
    3251              : 
    3252          413 :     ReleaseSysCache(dbtup);
    3253              : 
    3254          413 :     return invalid;
    3255              : }
    3256              : 
    3257              : 
    3258              : /*
    3259              :  * recovery_create_dbdir()
    3260              :  *
    3261              :  * During recovery, there's a case where we validly need to recover a missing
    3262              :  * tablespace directory so that recovery can continue.  This happens when
    3263              :  * recovery wants to create a database but the holding tablespace has been
    3264              :  * removed before the server stopped.  Since we expect that the directory will
    3265              :  * be gone before reaching recovery consistency, and we have no knowledge about
    3266              :  * the tablespace other than its OID here, we create a real directory under
    3267              :  * pg_tblspc here instead of restoring the symlink.
    3268              :  *
    3269              :  * If only_tblspc is true, then the requested directory must be in pg_tblspc/
    3270              :  */
    3271              : static void
    3272           23 : recovery_create_dbdir(char *path, bool only_tblspc)
    3273              : {
    3274              :     struct stat st;
    3275              : 
    3276              :     Assert(RecoveryInProgress());
    3277              : 
    3278           23 :     if (stat(path, &st) == 0)
    3279           23 :         return;
    3280              : 
    3281            0 :     if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
    3282            0 :         elog(PANIC, "requested to created invalid directory: %s", path);
    3283              : 
    3284            0 :     if (reachedConsistency && !allow_in_place_tablespaces)
    3285            0 :         ereport(PANIC,
    3286              :                 errmsg("missing directory \"%s\"", path));
    3287              : 
    3288            0 :     elog(reachedConsistency ? WARNING : DEBUG1,
    3289              :          "creating missing directory: %s", path);
    3290              : 
    3291            0 :     if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
    3292            0 :         ereport(PANIC,
    3293              :                 errmsg("could not create missing directory \"%s\": %m", path));
    3294              : }
    3295              : 
    3296              : 
    3297              : /*
    3298              :  * DATABASE resource manager's routines
    3299              :  */
    3300              : void
    3301           42 : dbase_redo(XLogReaderState *record)
    3302              : {
    3303           42 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    3304              : 
    3305              :     /* Backup blocks are not used in dbase records */
    3306              :     Assert(!XLogRecHasAnyBlockRefs(record));
    3307              : 
    3308           42 :     if (info == XLOG_DBASE_CREATE_FILE_COPY)
    3309              :     {
    3310            5 :         xl_dbase_create_file_copy_rec *xlrec =
    3311            5 :             (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
    3312              :         char       *src_path;
    3313              :         char       *dst_path;
    3314              :         char       *parent_path;
    3315              :         struct stat st;
    3316              : 
    3317            5 :         src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
    3318            5 :         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3319              : 
    3320              :         /*
    3321              :          * Our theory for replaying a CREATE is to forcibly drop the target
    3322              :          * subdirectory if present, then re-copy the source data. This may be
    3323              :          * more work than needed, but it is simple to implement.
    3324              :          */
    3325            5 :         if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
    3326              :         {
    3327            0 :             if (!rmtree(dst_path, true))
    3328              :                 /* If this failed, copydir() below is going to error. */
    3329            0 :                 ereport(WARNING,
    3330              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3331              :                                 dst_path)));
    3332              :         }
    3333              : 
    3334              :         /*
    3335              :          * If the parent of the target path doesn't exist, create it now. This
    3336              :          * enables us to create the target underneath later.
    3337              :          */
    3338            5 :         parent_path = pstrdup(dst_path);
    3339            5 :         get_parent_directory(parent_path);
    3340            5 :         if (stat(parent_path, &st) < 0)
    3341              :         {
    3342            0 :             if (errno != ENOENT)
    3343            0 :                 ereport(FATAL,
    3344              :                         errmsg("could not stat directory \"%s\": %m",
    3345              :                                dst_path));
    3346              : 
    3347              :             /* create the parent directory if needed and valid */
    3348            0 :             recovery_create_dbdir(parent_path, true);
    3349              :         }
    3350            5 :         pfree(parent_path);
    3351              : 
    3352              :         /*
    3353              :          * There's a case where the copy source directory is missing for the
    3354              :          * same reason above.  Create the empty source directory so that
    3355              :          * copydir below doesn't fail.  The directory will be dropped soon by
    3356              :          * recovery.
    3357              :          */
    3358            5 :         if (stat(src_path, &st) < 0 && errno == ENOENT)
    3359            0 :             recovery_create_dbdir(src_path, false);
    3360              : 
    3361              :         /*
    3362              :          * Force dirty buffers out to disk, to ensure source database is
    3363              :          * up-to-date for the copy.
    3364              :          */
    3365            5 :         FlushDatabaseBuffers(xlrec->src_db_id);
    3366              : 
    3367              :         /* Close all smgr fds in all backends. */
    3368            5 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3369              : 
    3370              :         /*
    3371              :          * Copy this subdirectory to the new location
    3372              :          *
    3373              :          * We don't need to copy subdirectories
    3374              :          */
    3375            5 :         copydir(src_path, dst_path, false);
    3376              : 
    3377            5 :         pfree(src_path);
    3378            5 :         pfree(dst_path);
    3379              :     }
    3380           37 :     else if (info == XLOG_DBASE_CREATE_WAL_LOG)
    3381              :     {
    3382           23 :         xl_dbase_create_wal_log_rec *xlrec =
    3383           23 :             (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
    3384              :         char       *dbpath;
    3385              :         char       *parent_path;
    3386              : 
    3387           23 :         dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3388              : 
    3389              :         /* create the parent directory if needed and valid */
    3390           23 :         parent_path = pstrdup(dbpath);
    3391           23 :         get_parent_directory(parent_path);
    3392           23 :         recovery_create_dbdir(parent_path, true);
    3393           23 :         pfree(parent_path);
    3394              : 
    3395              :         /* Create the database directory with the version file. */
    3396           23 :         CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
    3397              :                                 true);
    3398           23 :         pfree(dbpath);
    3399              :     }
    3400           14 :     else if (info == XLOG_DBASE_DROP)
    3401              :     {
    3402           14 :         xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
    3403              :         char       *dst_path;
    3404              :         int         i;
    3405              : 
    3406           14 :         if (InHotStandby)
    3407              :         {
    3408              :             /*
    3409              :              * Lock database while we resolve conflicts to ensure that
    3410              :              * InitPostgres() cannot fully re-execute concurrently. This
    3411              :              * avoids backends re-connecting automatically to same database,
    3412              :              * which can happen in some cases.
    3413              :              *
    3414              :              * This will lock out walsenders trying to connect to db-specific
    3415              :              * slots for logical decoding too, so it's safe for us to drop
    3416              :              * slots.
    3417              :              */
    3418           14 :             LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3419           14 :             ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    3420              :         }
    3421              : 
    3422              :         /* Drop any database-specific replication slots */
    3423           14 :         ReplicationSlotsDropDBSlots(xlrec->db_id);
    3424              : 
    3425              :         /* Drop pages for this database that are in the shared buffer cache */
    3426           14 :         DropDatabaseBuffers(xlrec->db_id);
    3427              : 
    3428              :         /* Also, clean out any fsync requests that might be pending in md.c */
    3429           14 :         ForgetDatabaseSyncRequests(xlrec->db_id);
    3430              : 
    3431              :         /* Clean out the xlog relcache too */
    3432           14 :         XLogDropDatabase(xlrec->db_id);
    3433              : 
    3434              :         /* Close all smgr fds in all backends. */
    3435           14 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3436              : 
    3437           28 :         for (i = 0; i < xlrec->ntablespaces; i++)
    3438              :         {
    3439           14 :             dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
    3440              : 
    3441              :             /* And remove the physical files */
    3442           14 :             if (!rmtree(dst_path, true))
    3443            0 :                 ereport(WARNING,
    3444              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3445              :                                 dst_path)));
    3446           14 :             pfree(dst_path);
    3447              :         }
    3448              : 
    3449           14 :         if (InHotStandby)
    3450              :         {
    3451              :             /*
    3452              :              * Release locks prior to commit. XXX There is a race condition
    3453              :              * here that may allow backends to reconnect, but the window for
    3454              :              * this is small because the gap between here and commit is mostly
    3455              :              * fairly small and it is unlikely that people will be dropping
    3456              :              * databases that we are trying to connect to anyway.
    3457              :              */
    3458           14 :             UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3459              :         }
    3460              :     }
    3461              :     else
    3462            0 :         elog(PANIC, "dbase_redo: unknown op code %u", info);
    3463           42 : }
        

Generated by: LCOV version 2.0-1