LCOV - code coverage report
Current view: top level - src/backend/commands - dbcommands.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.4 % 1113 939
Test Date: 2026-04-20 15:16:29 Functions: 96.6 % 29 28
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * dbcommands.c
       4              :  *      Database management commands (create/drop database).
       5              :  *
       6              :  * Note: database creation/destruction commands use exclusive locks on
       7              :  * the database objects (as expressed by LockSharedObject()) to avoid
       8              :  * stepping on each others' toes.  Formerly we used table-level locks
       9              :  * on pg_database, but that's too coarse-grained.
      10              :  *
      11              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      12              :  * Portions Copyright (c) 1994, Regents of the University of California
      13              :  *
      14              :  *
      15              :  * IDENTIFICATION
      16              :  *    src/backend/commands/dbcommands.c
      17              :  *
      18              :  *-------------------------------------------------------------------------
      19              :  */
      20              : #include "postgres.h"
      21              : 
      22              : #include <fcntl.h>
      23              : #include <unistd.h>
      24              : #include <sys/stat.h>
      25              : 
      26              : #include "access/genam.h"
      27              : #include "access/heapam.h"
      28              : #include "access/htup_details.h"
      29              : #include "access/multixact.h"
      30              : #include "access/tableam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xloginsert.h"
      33              : #include "access/xlogrecovery.h"
      34              : #include "access/xlogutils.h"
      35              : #include "catalog/catalog.h"
      36              : #include "catalog/dependency.h"
      37              : #include "catalog/indexing.h"
      38              : #include "catalog/objectaccess.h"
      39              : #include "catalog/pg_authid.h"
      40              : #include "catalog/pg_collation.h"
      41              : #include "catalog/pg_database.h"
      42              : #include "catalog/pg_db_role_setting.h"
      43              : #include "catalog/pg_subscription.h"
      44              : #include "catalog/pg_tablespace.h"
      45              : #include "commands/comment.h"
      46              : #include "commands/dbcommands.h"
      47              : #include "commands/dbcommands_xlog.h"
      48              : #include "commands/defrem.h"
      49              : #include "commands/seclabel.h"
      50              : #include "commands/tablespace.h"
      51              : #include "common/file_perm.h"
      52              : #include "mb/pg_wchar.h"
      53              : #include "miscadmin.h"
      54              : #include "pgstat.h"
      55              : #include "postmaster/bgwriter.h"
      56              : #include "replication/slot.h"
      57              : #include "storage/copydir.h"
      58              : #include "storage/fd.h"
      59              : #include "storage/ipc.h"
      60              : #include "storage/lmgr.h"
      61              : #include "storage/md.h"
      62              : #include "storage/procarray.h"
      63              : #include "storage/procsignal.h"
      64              : #include "storage/smgr.h"
      65              : #include "utils/acl.h"
      66              : #include "utils/builtins.h"
      67              : #include "utils/fmgroids.h"
      68              : #include "utils/lsyscache.h"
      69              : #include "utils/pg_locale.h"
      70              : #include "utils/relmapper.h"
      71              : #include "utils/snapmgr.h"
      72              : #include "utils/syscache.h"
      73              : #include "utils/wait_event.h"
      74              : 
      75              : /*
      76              :  * Create database strategy.
      77              :  *
      78              :  * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
      79              :  * copied block.
      80              :  *
      81              :  * CREATEDB_FILE_COPY will simply perform a file system level copy of the
      82              :  * database and log a single record for each tablespace copied. To make this
      83              :  * safe, it also triggers checkpoints before and after the operation.
      84              :  */
      85              : typedef enum CreateDBStrategy
      86              : {
      87              :     CREATEDB_WAL_LOG,
      88              :     CREATEDB_FILE_COPY,
      89              : } CreateDBStrategy;
      90              : 
      91              : typedef struct
      92              : {
      93              :     Oid         src_dboid;      /* source (template) DB */
      94              :     Oid         dest_dboid;     /* DB we are trying to create */
      95              :     CreateDBStrategy strategy;  /* create db strategy */
      96              : } createdb_failure_params;
      97              : 
      98              : typedef struct
      99              : {
     100              :     Oid         dest_dboid;     /* DB we are trying to move */
     101              :     Oid         dest_tsoid;     /* tablespace we are trying to move to */
     102              : } movedb_failure_params;
     103              : 
     104              : /*
     105              :  * Information about a relation to be copied when creating a database.
     106              :  */
     107              : typedef struct CreateDBRelInfo
     108              : {
     109              :     RelFileLocator rlocator;    /* physical relation identifier */
     110              :     Oid         reloid;         /* relation oid */
     111              :     bool        permanent;      /* relation is permanent or unlogged */
     112              : } CreateDBRelInfo;
     113              : 
     114              : 
     115              : /* non-export function prototypes */
     116              : static void createdb_failure_callback(int code, Datum arg);
     117              : static void movedb(const char *dbname, const char *tblspcname);
     118              : static void movedb_failure_callback(int code, Datum arg);
     119              : static bool get_db_info(const char *name, LOCKMODE lockmode,
     120              :                         Oid *dbIdP, Oid *ownerIdP,
     121              :                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
     122              :                         TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
     123              :                         Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
     124              :                         char **dbIcurules,
     125              :                         char *dbLocProvider,
     126              :                         char **dbCollversion);
     127              : static void remove_dbtablespaces(Oid db_id);
     128              : static bool check_db_file_conflict(Oid db_id);
     129              : static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
     130              : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     131              :                                       Oid dst_tsid);
     132              : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
     133              : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
     134              :                                            Oid dbid, char *srcpath,
     135              :                                            List *rlocatorlist, Snapshot snapshot);
     136              : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
     137              :                                                        Oid tbid, Oid dbid,
     138              :                                                        char *srcpath);
     139              : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
     140              :                                     bool isRedo);
     141              : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
     142              :                                         Oid src_tsid, Oid dst_tsid);
     143              : static void recovery_create_dbdir(char *path, bool only_tblspc);
     144              : 
     145              : /*
     146              :  * Create a new database using the WAL_LOG strategy.
     147              :  *
     148              :  * Each copied block is separately written to the write-ahead log.
     149              :  */
     150              : static void
     151          280 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
     152              :                           Oid src_tsid, Oid dst_tsid)
     153              : {
     154              :     char       *srcpath;
     155              :     char       *dstpath;
     156          280 :     List       *rlocatorlist = NULL;
     157              :     ListCell   *cell;
     158              :     LockRelId   srcrelid;
     159              :     LockRelId   dstrelid;
     160              :     RelFileLocator srcrlocator;
     161              :     RelFileLocator dstrlocator;
     162              :     CreateDBRelInfo *relinfo;
     163              : 
     164              :     /* Get source and destination database paths. */
     165          280 :     srcpath = GetDatabasePath(src_dboid, src_tsid);
     166          280 :     dstpath = GetDatabasePath(dst_dboid, dst_tsid);
     167              : 
     168              :     /* Create database directory and write PG_VERSION file. */
     169          280 :     CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
     170              : 
     171              :     /* Copy relmap file from source database to the destination database. */
     172          280 :     RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
     173              : 
     174              :     /* Get list of relfilelocators to copy from the source database. */
     175          280 :     rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
     176              :     Assert(rlocatorlist != NIL);
     177              : 
     178              :     /*
     179              :      * Database IDs will be the same for all relations so set them before
     180              :      * entering the loop.
     181              :      */
     182          280 :     srcrelid.dbId = src_dboid;
     183          280 :     dstrelid.dbId = dst_dboid;
     184              : 
     185              :     /* Loop over our list of relfilelocators and copy each one. */
     186        68138 :     foreach(cell, rlocatorlist)
     187              :     {
     188        67860 :         relinfo = lfirst(cell);
     189        67860 :         srcrlocator = relinfo->rlocator;
     190              : 
     191              :         /*
     192              :          * If the relation is from the source db's default tablespace then we
     193              :          * need to create it in the destination db's default tablespace.
     194              :          * Otherwise, we need to create in the same tablespace as it is in the
     195              :          * source database.
     196              :          */
     197        67860 :         if (srcrlocator.spcOid == src_tsid)
     198        67860 :             dstrlocator.spcOid = dst_tsid;
     199              :         else
     200            0 :             dstrlocator.spcOid = srcrlocator.spcOid;
     201              : 
     202        67860 :         dstrlocator.dbOid = dst_dboid;
     203        67860 :         dstrlocator.relNumber = srcrlocator.relNumber;
     204              : 
     205              :         /*
     206              :          * Acquire locks on source and target relations before copying.
     207              :          *
     208              :          * We typically do not read relation data into shared_buffers without
     209              :          * holding a relation lock. It's unclear what could go wrong if we
     210              :          * skipped it in this case, because nobody can be modifying either the
     211              :          * source or destination database at this point, and we have locks on
     212              :          * both databases, too, but let's take the conservative route.
     213              :          */
     214        67860 :         dstrelid.relId = srcrelid.relId = relinfo->reloid;
     215        67860 :         LockRelationId(&srcrelid, AccessShareLock);
     216        67860 :         LockRelationId(&dstrelid, AccessShareLock);
     217              : 
     218              :         /* Copy relation storage from source to the destination. */
     219        67860 :         CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
     220              : 
     221              :         /* Release the relation locks. */
     222        67858 :         UnlockRelationId(&srcrelid, AccessShareLock);
     223        67858 :         UnlockRelationId(&dstrelid, AccessShareLock);
     224              :     }
     225              : 
     226          278 :     pfree(srcpath);
     227          278 :     pfree(dstpath);
     228          278 :     list_free_deep(rlocatorlist);
     229          278 : }
     230              : 
     231              : /*
     232              :  * Scan the pg_class table in the source database to identify the relations
     233              :  * that need to be copied to the destination database.
     234              :  *
     235              :  * This is an exception to the usual rule that cross-database access is
     236              :  * not possible. We can make it work here because we know that there are no
     237              :  * connections to the source database and (since there can't be prepared
     238              :  * transactions touching that database) no in-doubt tuples either. This
     239              :  * means that we don't need to worry about pruning removing anything from
     240              :  * under us, and we don't need to be too picky about our snapshot either.
     241              :  * As long as it sees all previously-committed XIDs as committed and all
     242              :  * aborted XIDs as aborted, we should be fine: nothing else is possible
     243              :  * here.
     244              :  *
     245              :  * We can't rely on the relcache for anything here, because that only knows
     246              :  * about the database to which we are connected, and can't handle access to
     247              :  * other databases. That also means we can't rely on the heap scan
     248              :  * infrastructure, which would be a bad idea anyway since it might try
     249              :  * to do things like HOT pruning which we definitely can't do safely in
     250              :  * a database to which we're not even connected.
     251              :  */
     252              : static List *
     253          280 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
     254              : {
     255              :     RelFileLocator rlocator;
     256              :     BlockNumber nblocks;
     257              :     BlockNumber blkno;
     258              :     Buffer      buf;
     259              :     RelFileNumber relfilenumber;
     260              :     Page        page;
     261          280 :     List       *rlocatorlist = NIL;
     262              :     LockRelId   relid;
     263              :     Snapshot    snapshot;
     264              :     SMgrRelation smgr;
     265              :     BufferAccessStrategy bstrategy;
     266              : 
     267              :     /* Get pg_class relfilenumber. */
     268          280 :     relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     269              :                                                           RelationRelationId);
     270              : 
     271              :     /* Don't read data into shared_buffers without holding a relation lock. */
     272          280 :     relid.dbId = dbid;
     273          280 :     relid.relId = RelationRelationId;
     274          280 :     LockRelationId(&relid, AccessShareLock);
     275              : 
     276              :     /* Prepare a RelFileLocator for the pg_class relation. */
     277          280 :     rlocator.spcOid = tbid;
     278          280 :     rlocator.dbOid = dbid;
     279          280 :     rlocator.relNumber = relfilenumber;
     280              : 
     281          280 :     smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
     282          280 :     nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
     283          280 :     smgrclose(smgr);
     284              : 
     285              :     /* Use a buffer access strategy since this is a bulk read operation. */
     286          280 :     bstrategy = GetAccessStrategy(BAS_BULKREAD);
     287              : 
     288              :     /*
     289              :      * As explained in the function header comments, we need a snapshot that
     290              :      * will see all committed transactions as committed, and our transaction
     291              :      * snapshot - or the active snapshot - might not be new enough for that,
     292              :      * but the return value of GetLatestSnapshot() should work fine.
     293              :      */
     294          280 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
     295              : 
     296              :     /* Process the relation block by block. */
     297         4480 :     for (blkno = 0; blkno < nblocks; blkno++)
     298              :     {
     299         4200 :         CHECK_FOR_INTERRUPTS();
     300              : 
     301         4200 :         buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
     302              :                                         RBM_NORMAL, bstrategy, true);
     303              : 
     304         4200 :         LockBuffer(buf, BUFFER_LOCK_SHARE);
     305         4200 :         page = BufferGetPage(buf);
     306         4200 :         if (PageIsNew(page) || PageIsEmpty(page))
     307              :         {
     308            0 :             UnlockReleaseBuffer(buf);
     309            0 :             continue;
     310              :         }
     311              : 
     312              :         /* Append relevant pg_class tuples for current page to rlocatorlist. */
     313         4200 :         rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
     314              :                                                      srcpath, rlocatorlist,
     315              :                                                      snapshot);
     316              : 
     317         4200 :         UnlockReleaseBuffer(buf);
     318              :     }
     319          280 :     UnregisterSnapshot(snapshot);
     320              : 
     321              :     /* Release relation lock. */
     322          280 :     UnlockRelationId(&relid, AccessShareLock);
     323              : 
     324          280 :     return rlocatorlist;
     325              : }
     326              : 
     327              : /*
     328              :  * Scan one page of the source database's pg_class relation and add relevant
     329              :  * entries to rlocatorlist. The return value is the updated list.
     330              :  */
     331              : static List *
     332         4200 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
     333              :                               char *srcpath, List *rlocatorlist,
     334              :                               Snapshot snapshot)
     335              : {
     336         4200 :     BlockNumber blkno = BufferGetBlockNumber(buf);
     337              :     OffsetNumber offnum;
     338              :     OffsetNumber maxoff;
     339              :     HeapTupleData tuple;
     340              : 
     341         4200 :     maxoff = PageGetMaxOffsetNumber(page);
     342              : 
     343              :     /* Loop over offsets. */
     344         4200 :     for (offnum = FirstOffsetNumber;
     345       199466 :          offnum <= maxoff;
     346       195266 :          offnum = OffsetNumberNext(offnum))
     347              :     {
     348              :         ItemId      itemid;
     349              : 
     350       195266 :         itemid = PageGetItemId(page, offnum);
     351              : 
     352              :         /* Nothing to do if slot is empty or already dead. */
     353       195266 :         if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
     354       156571 :             ItemIdIsRedirected(itemid))
     355        68655 :             continue;
     356              : 
     357              :         Assert(ItemIdIsNormal(itemid));
     358       126611 :         ItemPointerSet(&(tuple.t_self), blkno, offnum);
     359              : 
     360              :         /* Initialize a HeapTupleData structure. */
     361       126611 :         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
     362       126611 :         tuple.t_len = ItemIdGetLength(itemid);
     363       126611 :         tuple.t_tableOid = RelationRelationId;
     364              : 
     365              :         /* Skip tuples that are not visible to this snapshot. */
     366       126611 :         if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
     367              :         {
     368              :             CreateDBRelInfo *relinfo;
     369              : 
     370              :             /*
     371              :              * ScanSourceDatabasePgClassTuple is in charge of constructing a
     372              :              * CreateDBRelInfo object for this tuple, but can also decide that
     373              :              * this tuple isn't something we need to copy. If we do need to
     374              :              * copy the relation, add it to the list.
     375              :              */
     376       126588 :             relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
     377              :                                                      srcpath);
     378       126588 :             if (relinfo != NULL)
     379        68348 :                 rlocatorlist = lappend(rlocatorlist, relinfo);
     380              :         }
     381              :     }
     382              : 
     383         4200 :     return rlocatorlist;
     384              : }
     385              : 
     386              : /*
     387              :  * Decide whether a certain pg_class tuple represents something that
     388              :  * needs to be copied from the source database to the destination database,
     389              :  * and if so, construct a CreateDBRelInfo for it.
     390              :  *
     391              :  * Visibility checks are handled by the caller, so our job here is just
     392              :  * to assess the data stored in the tuple.
     393              :  */
     394              : CreateDBRelInfo *
     395       126588 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
     396              :                                char *srcpath)
     397              : {
     398              :     CreateDBRelInfo *relinfo;
     399              :     Form_pg_class classForm;
     400       126588 :     RelFileNumber relfilenumber = InvalidRelFileNumber;
     401              : 
     402       126588 :     classForm = (Form_pg_class) GETSTRUCT(tuple);
     403              : 
     404              :     /*
     405              :      * Return NULL if this object does not need to be copied.
     406              :      *
     407              :      * Shared objects don't need to be copied, because they are shared.
     408              :      * Objects without storage can't be copied, because there's nothing to
     409              :      * copy. Temporary relations don't need to be copied either, because they
     410              :      * are inaccessible outside of the session that created them, which must
     411              :      * be gone already, and couldn't connect to a different database if it
     412              :      * still existed. autovacuum will eventually remove the pg_class entries
     413              :      * as well.
     414              :      */
     415       126588 :     if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
     416       113708 :         !RELKIND_HAS_STORAGE(classForm->relkind) ||
     417        68348 :         classForm->relpersistence == RELPERSISTENCE_TEMP)
     418        58240 :         return NULL;
     419              : 
     420              :     /*
     421              :      * If relfilenumber is valid then directly use it.  Otherwise, consult the
     422              :      * relmap.
     423              :      */
     424        68348 :     if (RelFileNumberIsValid(classForm->relfilenode))
     425        63588 :         relfilenumber = classForm->relfilenode;
     426              :     else
     427         4760 :         relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     428              :                                                               classForm->oid);
     429              : 
     430              :     /* We must have a valid relfilenumber. */
     431        68348 :     if (!RelFileNumberIsValid(relfilenumber))
     432            0 :         elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
     433              :              classForm->oid);
     434              : 
     435              :     /* Prepare a rel info element and add it to the list. */
     436        68348 :     relinfo = palloc_object(CreateDBRelInfo);
     437        68348 :     if (OidIsValid(classForm->reltablespace))
     438            0 :         relinfo->rlocator.spcOid = classForm->reltablespace;
     439              :     else
     440        68348 :         relinfo->rlocator.spcOid = tbid;
     441              : 
     442        68348 :     relinfo->rlocator.dbOid = dbid;
     443        68348 :     relinfo->rlocator.relNumber = relfilenumber;
     444        68348 :     relinfo->reloid = classForm->oid;
     445              : 
     446              :     /* Temporary relations were rejected above. */
     447              :     Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
     448        68348 :     relinfo->permanent =
     449        68348 :         (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
     450              : 
     451        68348 :     return relinfo;
     452              : }
     453              : 
     454              : /*
     455              :  * Create database directory and write out the PG_VERSION file in the database
     456              :  * path.  If isRedo is true, it's okay for the database directory to exist
     457              :  * already.
     458              :  */
     459              : static void
     460          305 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
     461              : {
     462              :     int         fd;
     463              :     int         nbytes;
     464              :     char        versionfile[MAXPGPATH];
     465              :     char        buf[16];
     466              : 
     467              :     /*
     468              :      * Note that we don't have to copy version data from the source database;
     469              :      * there's only one legal value.
     470              :      */
     471          305 :     sprintf(buf, "%s\n", PG_MAJORVERSION);
     472          305 :     nbytes = strlen(PG_MAJORVERSION) + 1;
     473              : 
     474              :     /* Create database directory. */
     475          305 :     if (MakePGDirectory(dbpath) < 0)
     476              :     {
     477              :         /* Failure other than already exists or not in WAL replay? */
     478            8 :         if (errno != EEXIST || !isRedo)
     479            0 :             ereport(ERROR,
     480              :                     (errcode_for_file_access(),
     481              :                      errmsg("could not create directory \"%s\": %m", dbpath)));
     482              :     }
     483              : 
     484              :     /*
     485              :      * Create PG_VERSION file in the database path.  If the file already
     486              :      * exists and we are in WAL replay then try again to open it in write
     487              :      * mode.
     488              :      */
     489          305 :     snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
     490              : 
     491          305 :     fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
     492          305 :     if (fd < 0 && errno == EEXIST && isRedo)
     493            8 :         fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
     494              : 
     495          305 :     if (fd < 0)
     496            0 :         ereport(ERROR,
     497              :                 (errcode_for_file_access(),
     498              :                  errmsg("could not create file \"%s\": %m", versionfile)));
     499              : 
     500              :     /* Write PG_MAJORVERSION in the PG_VERSION file. */
     501          305 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
     502          305 :     errno = 0;
     503          305 :     if ((int) write(fd, buf, nbytes) != nbytes)
     504              :     {
     505              :         /* If write didn't set errno, assume problem is no disk space. */
     506            0 :         if (errno == 0)
     507            0 :             errno = ENOSPC;
     508            0 :         ereport(ERROR,
     509              :                 (errcode_for_file_access(),
     510              :                  errmsg("could not write to file \"%s\": %m", versionfile)));
     511              :     }
     512          305 :     pgstat_report_wait_end();
     513              : 
     514          305 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
     515          305 :     if (pg_fsync(fd) != 0)
     516            0 :         ereport(data_sync_elevel(ERROR),
     517              :                 (errcode_for_file_access(),
     518              :                  errmsg("could not fsync file \"%s\": %m", versionfile)));
     519          305 :     fsync_fname(dbpath, true);
     520          305 :     pgstat_report_wait_end();
     521              : 
     522              :     /* Close the version file. */
     523          305 :     CloseTransientFile(fd);
     524              : 
     525              :     /* If we are not in WAL replay then write the WAL. */
     526          305 :     if (!isRedo)
     527              :     {
     528              :         xl_dbase_create_wal_log_rec xlrec;
     529              : 
     530          280 :         START_CRIT_SECTION();
     531              : 
     532          280 :         xlrec.db_id = dbid;
     533          280 :         xlrec.tablespace_id = tsid;
     534              : 
     535          280 :         XLogBeginInsert();
     536          280 :         XLogRegisterData(&xlrec,
     537              :                          sizeof(xl_dbase_create_wal_log_rec));
     538              : 
     539          280 :         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
     540              : 
     541          280 :         END_CRIT_SECTION();
     542              :     }
     543          305 : }
     544              : 
     545              : /*
     546              :  * Create a new database using the FILE_COPY strategy.
     547              :  *
     548              :  * Copy each tablespace at the filesystem level, and log a single WAL record
     549              :  * for each tablespace copied.  This requires a checkpoint before and after the
     550              :  * copy, which may be expensive, but it does greatly reduce WAL generation
     551              :  * if the copied database is large.
     552              :  */
     553              : static void
     554          152 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     555              :                             Oid dst_tsid)
     556              : {
     557              :     TableScanDesc scan;
     558              :     Relation    rel;
     559              :     HeapTuple   tuple;
     560              : 
     561              :     /*
     562              :      * Force a checkpoint before starting the copy. This will force all dirty
     563              :      * buffers, including those of unlogged tables, out to disk, to ensure
     564              :      * source database is up-to-date on disk for the copy.
     565              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
     566              :      * process any pending unlink requests. Otherwise, if a checkpoint
     567              :      * happened while we're copying files, a file might be deleted just when
     568              :      * we're about to copy it, causing the lstat() call in copydir() to fail
     569              :      * with ENOENT.
     570              :      *
     571              :      * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
     572              :      * is careful to ensure that template0 is fully written to disk prior to
     573              :      * any CREATE DATABASE commands.
     574              :      */
     575          152 :     if (!IsBinaryUpgrade)
     576          120 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     577              :                           CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
     578              : 
     579              :     /*
     580              :      * Iterate through all tablespaces of the template database, and copy each
     581              :      * one to the new database.
     582              :      */
     583          152 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
     584          152 :     scan = table_beginscan_catalog(rel, 0, NULL);
     585          490 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     586              :     {
     587          338 :         Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
     588          338 :         Oid         srctablespace = spaceform->oid;
     589              :         Oid         dsttablespace;
     590              :         char       *srcpath;
     591              :         char       *dstpath;
     592              :         struct stat st;
     593              : 
     594              :         /* No need to copy global tablespace */
     595          338 :         if (srctablespace == GLOBALTABLESPACE_OID)
     596          186 :             continue;
     597              : 
     598          186 :         srcpath = GetDatabasePath(src_dboid, srctablespace);
     599              : 
     600          338 :         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
     601          152 :             directory_is_empty(srcpath))
     602              :         {
     603              :             /* Assume we can ignore it */
     604           34 :             pfree(srcpath);
     605           34 :             continue;
     606              :         }
     607              : 
     608          152 :         if (srctablespace == src_tsid)
     609          152 :             dsttablespace = dst_tsid;
     610              :         else
     611            0 :             dsttablespace = srctablespace;
     612              : 
     613          152 :         dstpath = GetDatabasePath(dst_dboid, dsttablespace);
     614              : 
     615              :         /*
     616              :          * Copy this subdirectory to the new location
     617              :          *
     618              :          * We don't need to copy subdirectories
     619              :          */
     620          152 :         copydir(srcpath, dstpath, false);
     621              : 
     622              :         /* Record the filesystem change in XLOG */
     623              :         {
     624              :             xl_dbase_create_file_copy_rec xlrec;
     625              : 
     626          152 :             xlrec.db_id = dst_dboid;
     627          152 :             xlrec.tablespace_id = dsttablespace;
     628          152 :             xlrec.src_db_id = src_dboid;
     629          152 :             xlrec.src_tablespace_id = srctablespace;
     630              : 
     631          152 :             XLogBeginInsert();
     632          152 :             XLogRegisterData(&xlrec,
     633              :                              sizeof(xl_dbase_create_file_copy_rec));
     634              : 
     635          152 :             (void) XLogInsert(RM_DBASE_ID,
     636              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
     637              :         }
     638          152 :         pfree(srcpath);
     639          152 :         pfree(dstpath);
     640              :     }
     641          152 :     table_endscan(scan);
     642          152 :     table_close(rel, AccessShareLock);
     643              : 
     644              :     /*
     645              :      * We force a checkpoint before committing.  This effectively means that
     646              :      * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
     647              :      * replayed (at least not in ordinary crash recovery; we still have to
     648              :      * make the XLOG entry for the benefit of PITR operations). This avoids
     649              :      * two nasty scenarios:
     650              :      *
     651              :      * #1: At wal_level=minimal, we don't XLOG the contents of newly created
     652              :      * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
     653              :      * of DBASE_CREATE replay would lose such files created in the new
     654              :      * database between our commit and the next checkpoint.
     655              :      *
     656              :      * #2: Since we have to recopy the source database during DBASE_CREATE
     657              :      * replay, we run the risk of copying changes in it that were committed
     658              :      * after the original CREATE DATABASE command but before the system crash
     659              :      * that led to the replay.  This is at least unexpected and at worst could
     660              :      * lead to inconsistencies, eg duplicate table names.
     661              :      *
     662              :      * (Both of these were real bugs in releases 8.0 through 8.0.3.)
     663              :      *
     664              :      * In PITR replay, the first of these isn't an issue, and the second is
     665              :      * only a risk if the CREATE DATABASE and subsequent template database
     666              :      * change both occur while a base backup is being taken. There doesn't
     667              :      * seem to be much we can do about that except document it as a
     668              :      * limitation.
     669              :      *
     670              :      * In binary upgrade mode, we can skip this checkpoint because neither of
     671              :      * these problems applies: we don't ever replay the WAL generated during
     672              :      * pg_upgrade, and we don't support taking base backups during pg_upgrade
     673              :      * (not to mention that we don't concurrently modify template0, either).
     674              :      *
     675              :      * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
     676              :      * strategy that avoids these problems.
     677              :      */
     678          152 :     if (!IsBinaryUpgrade)
     679          120 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     680              :                           CHECKPOINT_WAIT);
     681          152 : }
     682              : 
     683              : /*
     684              :  * CREATE DATABASE
     685              :  */
     686              : Oid
     687          454 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
     688              : {
     689              :     Oid         src_dboid;
     690              :     Oid         src_owner;
     691          454 :     int         src_encoding = -1;
     692          454 :     char       *src_collate = NULL;
     693          454 :     char       *src_ctype = NULL;
     694          454 :     char       *src_locale = NULL;
     695          454 :     char       *src_icurules = NULL;
     696          454 :     char        src_locprovider = '\0';
     697          454 :     char       *src_collversion = NULL;
     698              :     bool        src_istemplate;
     699          454 :     bool        src_hasloginevt = false;
     700              :     bool        src_allowconn;
     701          454 :     TransactionId src_frozenxid = InvalidTransactionId;
     702          454 :     MultiXactId src_minmxid = InvalidMultiXactId;
     703              :     Oid         src_deftablespace;
     704              :     volatile Oid dst_deftablespace;
     705              :     Relation    pg_database_rel;
     706              :     HeapTuple   tuple;
     707          454 :     Datum       new_record[Natts_pg_database] = {0};
     708          454 :     bool        new_record_nulls[Natts_pg_database] = {0};
     709          454 :     Oid         dboid = InvalidOid;
     710              :     Oid         datdba;
     711              :     ListCell   *option;
     712          454 :     DefElem    *tablespacenameEl = NULL;
     713          454 :     DefElem    *ownerEl = NULL;
     714          454 :     DefElem    *templateEl = NULL;
     715          454 :     DefElem    *encodingEl = NULL;
     716          454 :     DefElem    *localeEl = NULL;
     717          454 :     DefElem    *builtinlocaleEl = NULL;
     718          454 :     DefElem    *collateEl = NULL;
     719          454 :     DefElem    *ctypeEl = NULL;
     720          454 :     DefElem    *iculocaleEl = NULL;
     721          454 :     DefElem    *icurulesEl = NULL;
     722          454 :     DefElem    *locproviderEl = NULL;
     723          454 :     DefElem    *istemplateEl = NULL;
     724          454 :     DefElem    *allowconnectionsEl = NULL;
     725          454 :     DefElem    *connlimitEl = NULL;
     726          454 :     DefElem    *collversionEl = NULL;
     727          454 :     DefElem    *strategyEl = NULL;
     728          454 :     char       *dbname = stmt->dbname;
     729          454 :     char       *dbowner = NULL;
     730          454 :     const char *dbtemplate = NULL;
     731          454 :     char       *dbcollate = NULL;
     732          454 :     char       *dbctype = NULL;
     733          454 :     const char *dblocale = NULL;
     734          454 :     char       *dbicurules = NULL;
     735          454 :     char        dblocprovider = '\0';
     736              :     char       *canonname;
     737          454 :     int         encoding = -1;
     738          454 :     bool        dbistemplate = false;
     739          454 :     bool        dballowconnections = true;
     740          454 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
     741          454 :     char       *dbcollversion = NULL;
     742              :     int         notherbackends;
     743              :     int         npreparedxacts;
     744          454 :     CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
     745              :     createdb_failure_params fparms;
     746              : 
     747              :     /* Report error if name has \n or \r character. */
     748          454 :     if (strpbrk(dbname, "\n\r"))
     749            3 :         ereport(ERROR,
     750              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     751              :                  errmsg("database name \"%s\" contains a newline or carriage return character", dbname)));
     752              : 
     753              :     /* Extract options from the statement node tree */
     754         1349 :     foreach(option, stmt->options)
     755              :     {
     756          898 :         DefElem    *defel = (DefElem *) lfirst(option);
     757              : 
     758          898 :         if (strcmp(defel->defname, "tablespace") == 0)
     759              :         {
     760           17 :             if (tablespacenameEl)
     761            0 :                 errorConflictingDefElem(defel, pstate);
     762           17 :             tablespacenameEl = defel;
     763              :         }
     764          881 :         else if (strcmp(defel->defname, "owner") == 0)
     765              :         {
     766            5 :             if (ownerEl)
     767            0 :                 errorConflictingDefElem(defel, pstate);
     768            5 :             ownerEl = defel;
     769              :         }
     770          876 :         else if (strcmp(defel->defname, "template") == 0)
     771              :         {
     772          198 :             if (templateEl)
     773            0 :                 errorConflictingDefElem(defel, pstate);
     774          198 :             templateEl = defel;
     775              :         }
     776          678 :         else if (strcmp(defel->defname, "encoding") == 0)
     777              :         {
     778           61 :             if (encodingEl)
     779            0 :                 errorConflictingDefElem(defel, pstate);
     780           61 :             encodingEl = defel;
     781              :         }
     782          617 :         else if (strcmp(defel->defname, "locale") == 0)
     783              :         {
     784           57 :             if (localeEl)
     785            0 :                 errorConflictingDefElem(defel, pstate);
     786           57 :             localeEl = defel;
     787              :         }
     788          560 :         else if (strcmp(defel->defname, "builtin_locale") == 0)
     789              :         {
     790            8 :             if (builtinlocaleEl)
     791            0 :                 errorConflictingDefElem(defel, pstate);
     792            8 :             builtinlocaleEl = defel;
     793              :         }
     794          552 :         else if (strcmp(defel->defname, "lc_collate") == 0)
     795              :         {
     796           14 :             if (collateEl)
     797            0 :                 errorConflictingDefElem(defel, pstate);
     798           14 :             collateEl = defel;
     799              :         }
     800          538 :         else if (strcmp(defel->defname, "lc_ctype") == 0)
     801              :         {
     802           14 :             if (ctypeEl)
     803            0 :                 errorConflictingDefElem(defel, pstate);
     804           14 :             ctypeEl = defel;
     805              :         }
     806          524 :         else if (strcmp(defel->defname, "icu_locale") == 0)
     807              :         {
     808            5 :             if (iculocaleEl)
     809            0 :                 errorConflictingDefElem(defel, pstate);
     810            5 :             iculocaleEl = defel;
     811              :         }
     812          519 :         else if (strcmp(defel->defname, "icu_rules") == 0)
     813              :         {
     814            1 :             if (icurulesEl)
     815            0 :                 errorConflictingDefElem(defel, pstate);
     816            1 :             icurulesEl = defel;
     817              :         }
     818          518 :         else if (strcmp(defel->defname, "locale_provider") == 0)
     819              :         {
     820           62 :             if (locproviderEl)
     821            0 :                 errorConflictingDefElem(defel, pstate);
     822           62 :             locproviderEl = defel;
     823              :         }
     824          456 :         else if (strcmp(defel->defname, "is_template") == 0)
     825              :         {
     826           57 :             if (istemplateEl)
     827            0 :                 errorConflictingDefElem(defel, pstate);
     828           57 :             istemplateEl = defel;
     829              :         }
     830          399 :         else if (strcmp(defel->defname, "allow_connections") == 0)
     831              :         {
     832           56 :             if (allowconnectionsEl)
     833            0 :                 errorConflictingDefElem(defel, pstate);
     834           56 :             allowconnectionsEl = defel;
     835              :         }
     836          343 :         else if (strcmp(defel->defname, "connection_limit") == 0)
     837              :         {
     838            0 :             if (connlimitEl)
     839            0 :                 errorConflictingDefElem(defel, pstate);
     840            0 :             connlimitEl = defel;
     841              :         }
     842          343 :         else if (strcmp(defel->defname, "collation_version") == 0)
     843              :         {
     844           32 :             if (collversionEl)
     845            0 :                 errorConflictingDefElem(defel, pstate);
     846           32 :             collversionEl = defel;
     847              :         }
     848          311 :         else if (strcmp(defel->defname, "location") == 0)
     849              :         {
     850            0 :             ereport(WARNING,
     851              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     852              :                      errmsg("LOCATION is not supported anymore"),
     853              :                      errhint("Consider using tablespaces instead."),
     854              :                      parser_errposition(pstate, defel->location)));
     855              :         }
     856          311 :         else if (strcmp(defel->defname, "oid") == 0)
     857              :         {
     858          147 :             dboid = defGetObjectId(defel);
     859              : 
     860              :             /*
     861              :              * We don't normally permit new databases to be created with
     862              :              * system-assigned OIDs. pg_upgrade tries to preserve database
     863              :              * OIDs, so we can't allow any database to be created with an OID
     864              :              * that might be in use in a freshly-initialized cluster created
     865              :              * by some future version. We assume all such OIDs will be from
     866              :              * the system-managed OID range.
     867              :              *
     868              :              * As an exception, however, we permit any OID to be assigned when
     869              :              * allow_system_table_mods=on (so that initdb can assign system
     870              :              * OIDs to template0 and postgres) or when performing a binary
     871              :              * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
     872              :              * in the source cluster).
     873              :              */
     874          147 :             if (dboid < FirstNormalObjectId &&
     875          130 :                 !allowSystemTableMods && !IsBinaryUpgrade)
     876            0 :                 ereport(ERROR,
     877              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
     878              :                         errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
     879              :         }
     880          164 :         else if (strcmp(defel->defname, "strategy") == 0)
     881              :         {
     882          164 :             if (strategyEl)
     883            0 :                 errorConflictingDefElem(defel, pstate);
     884          164 :             strategyEl = defel;
     885              :         }
     886              :         else
     887            0 :             ereport(ERROR,
     888              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     889              :                      errmsg("option \"%s\" not recognized", defel->defname),
     890              :                      parser_errposition(pstate, defel->location)));
     891              :     }
     892              : 
     893          451 :     if (ownerEl && ownerEl->arg)
     894            5 :         dbowner = defGetString(ownerEl);
     895          451 :     if (templateEl && templateEl->arg)
     896          198 :         dbtemplate = defGetString(templateEl);
     897          451 :     if (encodingEl && encodingEl->arg)
     898              :     {
     899              :         const char *encoding_name;
     900              : 
     901           61 :         if (IsA(encodingEl->arg, Integer))
     902              :         {
     903            0 :             encoding = defGetInt32(encodingEl);
     904            0 :             encoding_name = pg_encoding_to_char(encoding);
     905            0 :             if (strcmp(encoding_name, "") == 0 ||
     906            0 :                 pg_valid_server_encoding(encoding_name) < 0)
     907            0 :                 ereport(ERROR,
     908              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     909              :                          errmsg("%d is not a valid encoding code",
     910              :                                 encoding),
     911              :                          parser_errposition(pstate, encodingEl->location)));
     912              :         }
     913              :         else
     914              :         {
     915           61 :             encoding_name = defGetString(encodingEl);
     916           61 :             encoding = pg_valid_server_encoding(encoding_name);
     917           61 :             if (encoding < 0)
     918            0 :                 ereport(ERROR,
     919              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     920              :                          errmsg("%s is not a valid encoding name",
     921              :                                 encoding_name),
     922              :                          parser_errposition(pstate, encodingEl->location)));
     923              :         }
     924              :     }
     925          451 :     if (localeEl && localeEl->arg)
     926              :     {
     927           57 :         dbcollate = defGetString(localeEl);
     928           57 :         dbctype = defGetString(localeEl);
     929           57 :         dblocale = defGetString(localeEl);
     930              :     }
     931          451 :     if (builtinlocaleEl && builtinlocaleEl->arg)
     932            8 :         dblocale = defGetString(builtinlocaleEl);
     933          451 :     if (collateEl && collateEl->arg)
     934           14 :         dbcollate = defGetString(collateEl);
     935          451 :     if (ctypeEl && ctypeEl->arg)
     936           14 :         dbctype = defGetString(ctypeEl);
     937          451 :     if (iculocaleEl && iculocaleEl->arg)
     938            5 :         dblocale = defGetString(iculocaleEl);
     939          451 :     if (icurulesEl && icurulesEl->arg)
     940            1 :         dbicurules = defGetString(icurulesEl);
     941          451 :     if (locproviderEl && locproviderEl->arg)
     942              :     {
     943           62 :         char       *locproviderstr = defGetString(locproviderEl);
     944              : 
     945           62 :         if (pg_strcasecmp(locproviderstr, "builtin") == 0)
     946           17 :             dblocprovider = COLLPROVIDER_BUILTIN;
     947           45 :         else if (pg_strcasecmp(locproviderstr, "icu") == 0)
     948            8 :             dblocprovider = COLLPROVIDER_ICU;
     949           37 :         else if (pg_strcasecmp(locproviderstr, "libc") == 0)
     950           36 :             dblocprovider = COLLPROVIDER_LIBC;
     951              :         else
     952            1 :             ereport(ERROR,
     953              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     954              :                      errmsg("unrecognized locale provider: %s",
     955              :                             locproviderstr)));
     956              :     }
     957          450 :     if (istemplateEl && istemplateEl->arg)
     958           57 :         dbistemplate = defGetBoolean(istemplateEl);
     959          450 :     if (allowconnectionsEl && allowconnectionsEl->arg)
     960           56 :         dballowconnections = defGetBoolean(allowconnectionsEl);
     961          450 :     if (connlimitEl && connlimitEl->arg)
     962              :     {
     963            0 :         dbconnlimit = defGetInt32(connlimitEl);
     964            0 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
     965            0 :             ereport(ERROR,
     966              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     967              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
     968              :     }
     969          450 :     if (collversionEl)
     970           32 :         dbcollversion = defGetString(collversionEl);
     971              : 
     972              :     /* obtain OID of proposed owner */
     973          450 :     if (dbowner)
     974            5 :         datdba = get_role_oid(dbowner, false);
     975              :     else
     976          445 :         datdba = GetUserId();
     977              : 
     978              :     /*
     979              :      * To create a database, must have createdb privilege and must be able to
     980              :      * become the target role (this does not imply that the target role itself
     981              :      * must have createdb privilege).  The latter provision guards against
     982              :      * "giveaway" attacks.  Note that a superuser will always have both of
     983              :      * these privileges a fortiori.
     984              :      */
     985          450 :     if (!have_createdb_privilege())
     986            4 :         ereport(ERROR,
     987              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     988              :                  errmsg("permission denied to create database")));
     989              : 
     990          446 :     check_can_set_role(GetUserId(), datdba);
     991              : 
     992              :     /*
     993              :      * Lookup database (template) to be cloned, and obtain share lock on it.
     994              :      * ShareLock allows two CREATE DATABASEs to work from the same template
     995              :      * concurrently, while ensuring no one is busy dropping it in parallel
     996              :      * (which would be Very Bad since we'd likely get an incomplete copy
     997              :      * without knowing it).  This also prevents any new connections from being
     998              :      * made to the source until we finish copying it, so we can be sure it
     999              :      * won't change underneath us.
    1000              :      */
    1001          446 :     if (!dbtemplate)
    1002          249 :         dbtemplate = "template1"; /* Default template database name */
    1003              : 
    1004          446 :     if (!get_db_info(dbtemplate, ShareLock,
    1005              :                      &src_dboid, &src_owner, &src_encoding,
    1006              :                      &src_istemplate, &src_allowconn, &src_hasloginevt,
    1007              :                      &src_frozenxid, &src_minmxid, &src_deftablespace,
    1008              :                      &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
    1009              :                      &src_collversion))
    1010            0 :         ereport(ERROR,
    1011              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1012              :                  errmsg("template database \"%s\" does not exist",
    1013              :                         dbtemplate)));
    1014              : 
    1015              :     /*
    1016              :      * If the source database was in the process of being dropped, we can't
    1017              :      * use it as a template.
    1018              :      */
    1019          446 :     if (database_is_invalid_oid(src_dboid))
    1020            1 :         ereport(ERROR,
    1021              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1022              :                 errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
    1023              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    1024              : 
    1025              :     /*
    1026              :      * Permission check: to copy a DB that's not marked datistemplate, you
    1027              :      * must be superuser or the owner thereof.
    1028              :      */
    1029          445 :     if (!src_istemplate)
    1030              :     {
    1031           14 :         if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
    1032            0 :             ereport(ERROR,
    1033              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1034              :                      errmsg("permission denied to copy database \"%s\"",
    1035              :                             dbtemplate)));
    1036              :     }
    1037              : 
    1038              :     /* Validate the database creation strategy. */
    1039          445 :     if (strategyEl && strategyEl->arg)
    1040              :     {
    1041              :         char       *strategy;
    1042              : 
    1043          164 :         strategy = defGetString(strategyEl);
    1044          164 :         if (pg_strcasecmp(strategy, "wal_log") == 0)
    1045           11 :             dbstrategy = CREATEDB_WAL_LOG;
    1046          153 :         else if (pg_strcasecmp(strategy, "file_copy") == 0)
    1047              :         {
    1048          152 :             if (DataChecksumsInProgressOn())
    1049            0 :                 ereport(ERROR,
    1050              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1051              :                         errmsg("create database strategy \"%s\" not allowed when data checksums are being enabled",
    1052              :                                strategy));
    1053          152 :             dbstrategy = CREATEDB_FILE_COPY;
    1054              :         }
    1055              :         else
    1056            1 :             ereport(ERROR,
    1057              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1058              :                      errmsg("invalid create database strategy \"%s\"", strategy),
    1059              :                      errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
    1060              :     }
    1061              : 
    1062              :     /* If encoding or locales are defaulted, use source's setting */
    1063          444 :     if (encoding < 0)
    1064          383 :         encoding = src_encoding;
    1065          444 :     if (dbcollate == NULL)
    1066          376 :         dbcollate = src_collate;
    1067          444 :     if (dbctype == NULL)
    1068          376 :         dbctype = src_ctype;
    1069          444 :     if (dblocprovider == '\0')
    1070          383 :         dblocprovider = src_locprovider;
    1071          444 :     if (dblocale == NULL && dblocprovider == src_locprovider)
    1072          379 :         dblocale = src_locale;
    1073          444 :     if (dbicurules == NULL)
    1074          443 :         dbicurules = src_icurules;
    1075              : 
    1076              :     /* Some encodings are client only */
    1077          444 :     if (!PG_VALID_BE_ENCODING(encoding))
    1078            0 :         ereport(ERROR,
    1079              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1080              :                  errmsg("invalid server encoding %d", encoding)));
    1081              : 
    1082              :     /* Check that the chosen locales are valid, and get canonical spellings */
    1083          444 :     if (!check_locale(LC_COLLATE, dbcollate, &canonname))
    1084              :     {
    1085            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1086            0 :             ereport(ERROR,
    1087              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1088              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1089              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1090            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1091            0 :             ereport(ERROR,
    1092              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1093              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1094              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1095              :         else
    1096            1 :             ereport(ERROR,
    1097              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1098              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
    1099              :     }
    1100          443 :     dbcollate = canonname;
    1101          443 :     if (!check_locale(LC_CTYPE, dbctype, &canonname))
    1102              :     {
    1103            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1104            0 :             ereport(ERROR,
    1105              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1106              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1107              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1108            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1109            0 :             ereport(ERROR,
    1110              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1111              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1112              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1113              :         else
    1114            1 :             ereport(ERROR,
    1115              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1116              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
    1117              :     }
    1118              : 
    1119          442 :     dbctype = canonname;
    1120              : 
    1121          442 :     check_encoding_locale_matches(encoding, dbcollate, dbctype);
    1122              : 
    1123              :     /* validate provider-specific parameters */
    1124          442 :     if (dblocprovider != COLLPROVIDER_BUILTIN)
    1125              :     {
    1126          409 :         if (builtinlocaleEl)
    1127            0 :             ereport(ERROR,
    1128              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1129              :                      errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
    1130              :     }
    1131              : 
    1132          442 :     if (dblocprovider != COLLPROVIDER_ICU)
    1133              :     {
    1134          427 :         if (iculocaleEl)
    1135            1 :             ereport(ERROR,
    1136              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1137              :                      errmsg("ICU locale cannot be specified unless locale provider is ICU")));
    1138              : 
    1139          426 :         if (dbicurules)
    1140            1 :             ereport(ERROR,
    1141              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1142              :                      errmsg("ICU rules cannot be specified unless locale provider is ICU")));
    1143              :     }
    1144              : 
    1145              :     /* validate and canonicalize locale for the provider */
    1146          440 :     if (dblocprovider == COLLPROVIDER_BUILTIN)
    1147              :     {
    1148              :         /*
    1149              :          * This would happen if template0 uses the libc provider but the new
    1150              :          * database uses builtin.
    1151              :          */
    1152           31 :         if (!dblocale)
    1153            1 :             ereport(ERROR,
    1154              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1155              :                      errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
    1156              : 
    1157           30 :         dblocale = builtin_validate_locale(encoding, dblocale);
    1158              :     }
    1159          409 :     else if (dblocprovider == COLLPROVIDER_ICU)
    1160              :     {
    1161           15 :         if (!(is_encoding_supported_by_icu(encoding)))
    1162            1 :             ereport(ERROR,
    1163              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1164              :                      errmsg("encoding \"%s\" is not supported with ICU provider",
    1165              :                             pg_encoding_to_char(encoding))));
    1166              : 
    1167              :         /*
    1168              :          * This would happen if template0 uses the libc provider but the new
    1169              :          * database uses icu.
    1170              :          */
    1171           14 :         if (!dblocale)
    1172            1 :             ereport(ERROR,
    1173              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1174              :                      errmsg("LOCALE or ICU_LOCALE must be specified")));
    1175              : 
    1176              :         /*
    1177              :          * During binary upgrade, or when the locale came from the template
    1178              :          * database, preserve locale string. Otherwise, canonicalize to a
    1179              :          * language tag.
    1180              :          */
    1181           13 :         if (!IsBinaryUpgrade && dblocale != src_locale)
    1182              :         {
    1183            7 :             char       *langtag = icu_language_tag(dblocale,
    1184              :                                                    icu_validation_level);
    1185              : 
    1186            7 :             if (langtag && strcmp(dblocale, langtag) != 0)
    1187              :             {
    1188            3 :                 ereport(NOTICE,
    1189              :                         (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
    1190              :                                 langtag, dblocale)));
    1191              : 
    1192            3 :                 dblocale = langtag;
    1193              :             }
    1194              :         }
    1195              : 
    1196           13 :         icu_validate_locale(dblocale);
    1197              :     }
    1198              : 
    1199              :     /* for libc, locale comes from datcollate and datctype */
    1200          435 :     if (dblocprovider == COLLPROVIDER_LIBC)
    1201          394 :         dblocale = NULL;
    1202              : 
    1203              :     /*
    1204              :      * Check that the new encoding and locale settings match the source
    1205              :      * database.  We insist on this because we simply copy the source data ---
    1206              :      * any non-ASCII data would be wrongly encoded, and any indexes sorted
    1207              :      * according to the source locale would be wrong.
    1208              :      *
    1209              :      * However, we assume that template0 doesn't contain any non-ASCII data
    1210              :      * nor any indexes that depend on collation or ctype, so template0 can be
    1211              :      * used as template for creating a database with any encoding or locale.
    1212              :      */
    1213          435 :     if (strcmp(dbtemplate, "template0") != 0)
    1214              :     {
    1215          264 :         if (encoding != src_encoding)
    1216            0 :             ereport(ERROR,
    1217              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1218              :                      errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
    1219              :                             pg_encoding_to_char(encoding),
    1220              :                             pg_encoding_to_char(src_encoding)),
    1221              :                      errhint("Use the same encoding as in the template database, or use template0 as template.")));
    1222              : 
    1223          264 :         if (strcmp(dbcollate, src_collate) != 0)
    1224            1 :             ereport(ERROR,
    1225              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1226              :                      errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
    1227              :                             dbcollate, src_collate),
    1228              :                      errhint("Use the same collation as in the template database, or use template0 as template.")));
    1229              : 
    1230          263 :         if (strcmp(dbctype, src_ctype) != 0)
    1231            0 :             ereport(ERROR,
    1232              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1233              :                      errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
    1234              :                             dbctype, src_ctype),
    1235              :                      errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
    1236              : 
    1237          263 :         if (dblocprovider != src_locprovider)
    1238            0 :             ereport(ERROR,
    1239              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1240              :                      errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
    1241              :                             collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
    1242              :                      errhint("Use the same locale provider as in the template database, or use template0 as template.")));
    1243              : 
    1244          263 :         if (dblocprovider == COLLPROVIDER_ICU)
    1245              :         {
    1246              :             char       *val1;
    1247              :             char       *val2;
    1248              : 
    1249              :             Assert(dblocale);
    1250              :             Assert(src_locale);
    1251            6 :             if (strcmp(dblocale, src_locale) != 0)
    1252            0 :                 ereport(ERROR,
    1253              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1254              :                          errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
    1255              :                                 dblocale, src_locale),
    1256              :                          errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
    1257              : 
    1258            6 :             val1 = dbicurules;
    1259            6 :             if (!val1)
    1260            6 :                 val1 = "";
    1261            6 :             val2 = src_icurules;
    1262            6 :             if (!val2)
    1263            6 :                 val2 = "";
    1264            6 :             if (strcmp(val1, val2) != 0)
    1265            0 :                 ereport(ERROR,
    1266              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1267              :                          errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
    1268              :                                 val1, val2),
    1269              :                          errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
    1270              :         }
    1271              :     }
    1272              : 
    1273              :     /*
    1274              :      * If we got a collation version for the template database, check that it
    1275              :      * matches the actual OS collation version.  Otherwise error; the user
    1276              :      * needs to fix the template database first.  Don't complain if a
    1277              :      * collation version was specified explicitly as a statement option; that
    1278              :      * is used by pg_upgrade to reproduce the old state exactly.
    1279              :      *
    1280              :      * (If the template database has no collation version, then either the
    1281              :      * platform/provider does not support collation versioning, or it's
    1282              :      * template0, for which we stipulate that it does not contain
    1283              :      * collation-using objects.)
    1284              :      */
    1285          434 :     if (src_collversion && !collversionEl)
    1286              :     {
    1287              :         char       *actual_versionstr;
    1288              :         const char *locale;
    1289              : 
    1290          179 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1291          167 :             locale = dbcollate;
    1292              :         else
    1293           12 :             locale = dblocale;
    1294              : 
    1295          179 :         actual_versionstr = get_collation_actual_version(dblocprovider, locale);
    1296          179 :         if (!actual_versionstr)
    1297            0 :             ereport(ERROR,
    1298              :                     (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
    1299              :                             dbtemplate)));
    1300              : 
    1301          179 :         if (strcmp(actual_versionstr, src_collversion) != 0)
    1302            0 :             ereport(ERROR,
    1303              :                     (errmsg("template database \"%s\" has a collation version mismatch",
    1304              :                             dbtemplate),
    1305              :                      errdetail("The template database was created using collation version %s, "
    1306              :                                "but the operating system provides version %s.",
    1307              :                                src_collversion, actual_versionstr),
    1308              :                      errhint("Rebuild all objects in the template database that use the default collation and run "
    1309              :                              "ALTER DATABASE %s REFRESH COLLATION VERSION, "
    1310              :                              "or build PostgreSQL with the right library version.",
    1311              :                              quote_identifier(dbtemplate))));
    1312              :     }
    1313              : 
    1314          434 :     if (dbcollversion == NULL)
    1315          402 :         dbcollversion = src_collversion;
    1316              : 
    1317              :     /*
    1318              :      * Normally, we copy the collation version from the template database.
    1319              :      * This last resort only applies if the template database does not have a
    1320              :      * collation version, which is normally only the case for template0.
    1321              :      */
    1322          434 :     if (dbcollversion == NULL)
    1323              :     {
    1324              :         const char *locale;
    1325              : 
    1326          223 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1327          201 :             locale = dbcollate;
    1328              :         else
    1329           22 :             locale = dblocale;
    1330              : 
    1331          223 :         dbcollversion = get_collation_actual_version(dblocprovider, locale);
    1332              :     }
    1333              : 
    1334              :     /* Resolve default tablespace for new database */
    1335          434 :     if (tablespacenameEl && tablespacenameEl->arg)
    1336           17 :     {
    1337              :         char       *tablespacename;
    1338              :         AclResult   aclresult;
    1339              : 
    1340           17 :         tablespacename = defGetString(tablespacenameEl);
    1341           17 :         dst_deftablespace = get_tablespace_oid(tablespacename, false);
    1342              :         /* check permissions */
    1343           17 :         aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
    1344              :                                     ACL_CREATE);
    1345           17 :         if (aclresult != ACLCHECK_OK)
    1346            0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1347              :                            tablespacename);
    1348              : 
    1349              :         /* pg_global must never be the default tablespace */
    1350           17 :         if (dst_deftablespace == GLOBALTABLESPACE_OID)
    1351            0 :             ereport(ERROR,
    1352              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1353              :                      errmsg("pg_global cannot be used as default tablespace")));
    1354              : 
    1355              :         /*
    1356              :          * If we are trying to change the default tablespace of the template,
    1357              :          * we require that the template not have any files in the new default
    1358              :          * tablespace.  This is necessary because otherwise the copied
    1359              :          * database would contain pg_class rows that refer to its default
    1360              :          * tablespace both explicitly (by OID) and implicitly (as zero), which
    1361              :          * would cause problems.  For example another CREATE DATABASE using
    1362              :          * the copied database as template, and trying to change its default
    1363              :          * tablespace again, would yield outright incorrect results (it would
    1364              :          * improperly move tables to the new default tablespace that should
    1365              :          * stay in the same tablespace).
    1366              :          */
    1367           17 :         if (dst_deftablespace != src_deftablespace)
    1368              :         {
    1369              :             char       *srcpath;
    1370              :             struct stat st;
    1371              : 
    1372           17 :             srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
    1373              : 
    1374           17 :             if (stat(srcpath, &st) == 0 &&
    1375            0 :                 S_ISDIR(st.st_mode) &&
    1376            0 :                 !directory_is_empty(srcpath))
    1377            0 :                 ereport(ERROR,
    1378              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1379              :                          errmsg("cannot assign new default tablespace \"%s\"",
    1380              :                                 tablespacename),
    1381              :                          errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
    1382              :                                    dbtemplate)));
    1383           17 :             pfree(srcpath);
    1384              :         }
    1385              :     }
    1386              :     else
    1387              :     {
    1388              :         /* Use template database's default tablespace */
    1389          417 :         dst_deftablespace = src_deftablespace;
    1390              :         /* Note there is no additional permission check in this path */
    1391              :     }
    1392              : 
    1393              :     /*
    1394              :      * If built with appropriate switch, whine when regression-testing
    1395              :      * conventions for database names are violated.  But don't complain during
    1396              :      * initdb.
    1397              :      */
    1398              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1399              :     if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
    1400              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1401              : #endif
    1402              : 
    1403              :     /*
    1404              :      * Check for db name conflict.  This is just to give a more friendly error
    1405              :      * message than "unique index violation".  There's a race condition but
    1406              :      * we're willing to accept the less friendly message in that case.
    1407              :      */
    1408          434 :     if (OidIsValid(get_database_oid(dbname, true)))
    1409            1 :         ereport(ERROR,
    1410              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1411              :                  errmsg("database \"%s\" already exists", dbname)));
    1412              : 
    1413              :     /*
    1414              :      * The source DB can't have any active backends, except this one
    1415              :      * (exception is to allow CREATE DB while connected to template1).
    1416              :      * Otherwise we might copy inconsistent data.
    1417              :      *
    1418              :      * This should be last among the basic error checks, because it involves
    1419              :      * potential waiting; we may as well throw an error first if we're gonna
    1420              :      * throw one.
    1421              :      */
    1422          433 :     if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
    1423            1 :         ereport(ERROR,
    1424              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1425              :                  errmsg("source database \"%s\" is being accessed by other users",
    1426              :                         dbtemplate),
    1427              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1428              : 
    1429              :     /*
    1430              :      * Select an OID for the new database, checking that it doesn't have a
    1431              :      * filename conflict with anything already existing in the tablespace
    1432              :      * directories.
    1433              :      */
    1434          432 :     pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1435              : 
    1436              :     /*
    1437              :      * If database OID is configured, check if the OID is already in use or
    1438              :      * data directory already exists.
    1439              :      */
    1440          432 :     if (OidIsValid(dboid))
    1441              :     {
    1442          147 :         char       *existing_dbname = get_database_name(dboid);
    1443              : 
    1444          147 :         if (existing_dbname != NULL)
    1445            0 :             ereport(ERROR,
    1446              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1447              :                     errmsg("database OID %u is already in use by database \"%s\"",
    1448              :                            dboid, existing_dbname));
    1449              : 
    1450          147 :         if (check_db_file_conflict(dboid))
    1451            0 :             ereport(ERROR,
    1452              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1453              :                     errmsg("data directory with the specified OID %u already exists", dboid));
    1454              :     }
    1455              :     else
    1456              :     {
    1457              :         /* Select an OID for the new database if is not explicitly configured. */
    1458              :         do
    1459              :         {
    1460          285 :             dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
    1461              :                                        Anum_pg_database_oid);
    1462          285 :         } while (check_db_file_conflict(dboid));
    1463              :     }
    1464              : 
    1465              :     /*
    1466              :      * Insert a new tuple into pg_database.  This establishes our ownership of
    1467              :      * the new database name (anyone else trying to insert the same name will
    1468              :      * block on the unique index, and fail after we commit).
    1469              :      */
    1470              : 
    1471              :     Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
    1472              :            (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
    1473              : 
    1474              :     /* Form tuple */
    1475          432 :     new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
    1476          432 :     new_record[Anum_pg_database_datname - 1] =
    1477          432 :         DirectFunctionCall1(namein, CStringGetDatum(dbname));
    1478          432 :     new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
    1479          432 :     new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
    1480          432 :     new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
    1481          432 :     new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    1482          432 :     new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    1483          432 :     new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
    1484          432 :     new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    1485          432 :     new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
    1486          432 :     new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
    1487          432 :     new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
    1488          432 :     new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
    1489          432 :     new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
    1490          432 :     if (dblocale)
    1491           40 :         new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
    1492              :     else
    1493          392 :         new_record_nulls[Anum_pg_database_datlocale - 1] = true;
    1494          432 :     if (dbicurules)
    1495            0 :         new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
    1496              :     else
    1497          432 :         new_record_nulls[Anum_pg_database_daticurules - 1] = true;
    1498          432 :     if (dbcollversion)
    1499          372 :         new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
    1500              :     else
    1501           60 :         new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
    1502              : 
    1503              :     /*
    1504              :      * We deliberately set datacl to default (NULL), rather than copying it
    1505              :      * from the template database.  Copying it would be a bad idea when the
    1506              :      * owner is not the same as the template's owner.
    1507              :      */
    1508          432 :     new_record_nulls[Anum_pg_database_datacl - 1] = true;
    1509              : 
    1510          432 :     tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
    1511              :                             new_record, new_record_nulls);
    1512              : 
    1513          432 :     CatalogTupleInsert(pg_database_rel, tuple);
    1514              : 
    1515              :     /*
    1516              :      * Now generate additional catalog entries associated with the new DB
    1517              :      */
    1518              : 
    1519              :     /* Register owner dependency */
    1520          432 :     recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
    1521              : 
    1522              :     /* Create pg_shdepend entries for objects within database */
    1523          432 :     copyTemplateDependencies(src_dboid, dboid);
    1524              : 
    1525              :     /* Post creation hook for new database */
    1526          432 :     InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
    1527              : 
    1528              :     /*
    1529              :      * If we're going to be reading data for the to-be-created database into
    1530              :      * shared_buffers, take a lock on it. Nobody should know that this
    1531              :      * database exists yet, but it's good to maintain the invariant that an
    1532              :      * AccessExclusiveLock on the database is sufficient to drop all of its
    1533              :      * buffers without worrying about more being read later.
    1534              :      *
    1535              :      * Note that we need to do this before entering the
    1536              :      * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
    1537              :      * expects this lock to be held already.
    1538              :      */
    1539          432 :     if (dbstrategy == CREATEDB_WAL_LOG)
    1540          280 :         LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
    1541              : 
    1542              :     /*
    1543              :      * Once we start copying subdirectories, we need to be able to clean 'em
    1544              :      * up if we fail.  Use an ENSURE block to make sure this happens.  (This
    1545              :      * is not a 100% solution, because of the possibility of failure during
    1546              :      * transaction commit after we leave this routine, but it should handle
    1547              :      * most scenarios.)
    1548              :      */
    1549          432 :     fparms.src_dboid = src_dboid;
    1550          432 :     fparms.dest_dboid = dboid;
    1551          432 :     fparms.strategy = dbstrategy;
    1552              : 
    1553          432 :     PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1554              :                             PointerGetDatum(&fparms));
    1555              :     {
    1556              :         /*
    1557              :          * If the user has asked to create a database with WAL_LOG strategy
    1558              :          * then call CreateDatabaseUsingWalLog, which will copy the database
    1559              :          * at the block level and it will WAL log each copied block.
    1560              :          * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
    1561              :          * database file by file.
    1562              :          */
    1563          432 :         if (dbstrategy == CREATEDB_WAL_LOG)
    1564          280 :             CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
    1565              :                                       dst_deftablespace);
    1566              :         else
    1567          152 :             CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
    1568              :                                         dst_deftablespace);
    1569              : 
    1570              :         /*
    1571              :          * Close pg_database, but keep lock till commit.
    1572              :          */
    1573          430 :         table_close(pg_database_rel, NoLock);
    1574              : 
    1575              :         /*
    1576              :          * Force synchronous commit, thus minimizing the window between
    1577              :          * creation of the database files and committal of the transaction. If
    1578              :          * we crash before committing, we'll have a DB that's taking up disk
    1579              :          * space but is not in pg_database, which is not good.
    1580              :          */
    1581          430 :         ForceSyncCommit();
    1582              :     }
    1583          432 :     PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1584              :                                 PointerGetDatum(&fparms));
    1585              : 
    1586          430 :     return dboid;
    1587              : }
    1588              : 
    1589              : /*
    1590              :  * Check whether chosen encoding matches chosen locale settings.  This
    1591              :  * restriction is necessary because libc's locale-specific code usually
    1592              :  * fails when presented with data in an encoding it's not expecting. We
    1593              :  * allow mismatch in four cases:
    1594              :  *
    1595              :  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
    1596              :  * which works with any encoding.
    1597              :  *
    1598              :  * 2. locale encoding = -1, which means that we couldn't determine the
    1599              :  * locale's encoding and have to trust the user to get it right.
    1600              :  *
    1601              :  * 3. selected encoding is UTF8 and platform is win32. This is because
    1602              :  * UTF8 is a pseudo codepage that is supported in all locales since it's
    1603              :  * converted to UTF16 before being used.
    1604              :  *
    1605              :  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
    1606              :  * is risky but we have historically allowed it --- notably, the
    1607              :  * regression tests require it.
    1608              :  *
    1609              :  * Note: if you change this policy, fix initdb to match.
    1610              :  */
    1611              : void
    1612          458 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
    1613              : {
    1614          458 :     int         ctype_encoding = pg_get_encoding_from_locale(ctype, true);
    1615          458 :     int         collate_encoding = pg_get_encoding_from_locale(collate, true);
    1616              : 
    1617          462 :     if (!(ctype_encoding == encoding ||
    1618            4 :           ctype_encoding == PG_SQL_ASCII ||
    1619              :           ctype_encoding == -1 ||
    1620              : #ifdef WIN32
    1621              :           encoding == PG_UTF8 ||
    1622              : #endif
    1623            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1624            0 :         ereport(ERROR,
    1625              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1626              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1627              :                         pg_encoding_to_char(encoding),
    1628              :                         ctype),
    1629              :                  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
    1630              :                            pg_encoding_to_char(ctype_encoding))));
    1631              : 
    1632          462 :     if (!(collate_encoding == encoding ||
    1633            4 :           collate_encoding == PG_SQL_ASCII ||
    1634              :           collate_encoding == -1 ||
    1635              : #ifdef WIN32
    1636              :           encoding == PG_UTF8 ||
    1637              : #endif
    1638            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1639            0 :         ereport(ERROR,
    1640              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1641              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1642              :                         pg_encoding_to_char(encoding),
    1643              :                         collate),
    1644              :                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
    1645              :                            pg_encoding_to_char(collate_encoding))));
    1646          458 : }
    1647              : 
    1648              : /* Error cleanup callback for createdb */
    1649              : static void
    1650            2 : createdb_failure_callback(int code, Datum arg)
    1651              : {
    1652            2 :     createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
    1653              : 
    1654              :     /*
    1655              :      * If we were copying database at block levels then drop pages for the
    1656              :      * destination database that are in the shared buffer cache.  And tell
    1657              :      * checkpointer to forget any pending fsync and unlink requests for files
    1658              :      * in the database.  The reasoning behind doing this is same as explained
    1659              :      * in dropdb function.  But unlike dropdb we don't need to call
    1660              :      * pgstat_drop_database because this database is still not created so
    1661              :      * there should not be any stat for this.
    1662              :      */
    1663            2 :     if (fparms->strategy == CREATEDB_WAL_LOG)
    1664              :     {
    1665            2 :         DropDatabaseBuffers(fparms->dest_dboid);
    1666            2 :         ForgetDatabaseSyncRequests(fparms->dest_dboid);
    1667              : 
    1668              :         /* Release lock on the target database. */
    1669            2 :         UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
    1670              :                            AccessShareLock);
    1671              :     }
    1672              : 
    1673              :     /*
    1674              :      * Release lock on source database before doing recursive remove. This is
    1675              :      * not essential but it seems desirable to release the lock as soon as
    1676              :      * possible.
    1677              :      */
    1678            2 :     UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
    1679              : 
    1680              :     /* Throw away any successfully copied subdirectories */
    1681            2 :     remove_dbtablespaces(fparms->dest_dboid);
    1682            2 : }
    1683              : 
    1684              : 
    1685              : /*
    1686              :  * DROP DATABASE
    1687              :  */
    1688              : void
    1689           82 : dropdb(const char *dbname, bool missing_ok, bool force)
    1690              : {
    1691              :     Oid         db_id;
    1692              :     bool        db_istemplate;
    1693              :     Relation    pgdbrel;
    1694              :     HeapTuple   tup;
    1695              :     ScanKeyData scankey;
    1696              :     void       *inplace_state;
    1697              :     Form_pg_database datform;
    1698              :     int         notherbackends;
    1699              :     int         npreparedxacts;
    1700              :     int         nslots,
    1701              :                 nslots_active;
    1702              :     int         nsubscriptions;
    1703              : 
    1704              :     /*
    1705              :      * Look up the target database's OID, and get exclusive lock on it. We
    1706              :      * need this to ensure that no new backend starts up in the target
    1707              :      * database while we are deleting it (see postinit.c), and that no one is
    1708              :      * using it as a CREATE DATABASE template or trying to delete it for
    1709              :      * themselves.
    1710              :      */
    1711           82 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1712              : 
    1713           82 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1714              :                      &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1715              :     {
    1716           21 :         if (!missing_ok)
    1717              :         {
    1718           10 :             ereport(ERROR,
    1719              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    1720              :                      errmsg("database \"%s\" does not exist", dbname)));
    1721              :         }
    1722              :         else
    1723              :         {
    1724              :             /* Close pg_database, release the lock, since we changed nothing */
    1725           11 :             table_close(pgdbrel, RowExclusiveLock);
    1726           11 :             ereport(NOTICE,
    1727              :                     (errmsg("database \"%s\" does not exist, skipping",
    1728              :                             dbname)));
    1729           11 :             return;
    1730              :         }
    1731              :     }
    1732              : 
    1733              :     /*
    1734              :      * Permission checks
    1735              :      */
    1736           61 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1737            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1738              :                        dbname);
    1739              : 
    1740              :     /* DROP hook for the database being removed */
    1741           61 :     InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
    1742              : 
    1743              :     /*
    1744              :      * Disallow dropping a DB that is marked istemplate.  This is just to
    1745              :      * prevent people from accidentally dropping template0 or template1; they
    1746              :      * can do so if they're really determined ...
    1747              :      */
    1748           61 :     if (db_istemplate)
    1749            0 :         ereport(ERROR,
    1750              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1751              :                  errmsg("cannot drop a template database")));
    1752              : 
    1753              :     /* Obviously can't drop my own database */
    1754           61 :     if (db_id == MyDatabaseId)
    1755            0 :         ereport(ERROR,
    1756              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1757              :                  errmsg("cannot drop the currently open database")));
    1758              : 
    1759              :     /*
    1760              :      * Check whether there are active logical slots that refer to the
    1761              :      * to-be-dropped database. The database lock we are holding prevents the
    1762              :      * creation of new slots using the database or existing slots becoming
    1763              :      * active.
    1764              :      */
    1765           61 :     (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
    1766           61 :     if (nslots_active)
    1767              :     {
    1768            1 :         ereport(ERROR,
    1769              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1770              :                  errmsg("database \"%s\" is used by an active logical replication slot",
    1771              :                         dbname),
    1772              :                  errdetail_plural("There is %d active slot.",
    1773              :                                   "There are %d active slots.",
    1774              :                                   nslots_active, nslots_active)));
    1775              :     }
    1776              : 
    1777              :     /*
    1778              :      * Check if there are subscriptions defined in the target database.
    1779              :      *
    1780              :      * We can't drop them automatically because they might be holding
    1781              :      * resources in other databases/instances.
    1782              :      */
    1783           60 :     if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
    1784            0 :         ereport(ERROR,
    1785              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1786              :                  errmsg("database \"%s\" is being used by logical replication subscription",
    1787              :                         dbname),
    1788              :                  errdetail_plural("There is %d subscription.",
    1789              :                                   "There are %d subscriptions.",
    1790              :                                   nsubscriptions, nsubscriptions)));
    1791              : 
    1792              : 
    1793              :     /*
    1794              :      * Attempt to terminate all existing connections to the target database if
    1795              :      * the user has requested to do so.
    1796              :      */
    1797           60 :     if (force)
    1798            1 :         TerminateOtherDBBackends(db_id);
    1799              : 
    1800              :     /*
    1801              :      * Check for other backends in the target database.  (Because we hold the
    1802              :      * database lock, no new ones can start after this.)
    1803              :      *
    1804              :      * As in CREATE DATABASE, check this after other error conditions.
    1805              :      */
    1806           60 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1807            0 :         ereport(ERROR,
    1808              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1809              :                  errmsg("database \"%s\" is being accessed by other users",
    1810              :                         dbname),
    1811              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1812              : 
    1813              :     /*
    1814              :      * Delete any comments or security labels associated with the database.
    1815              :      */
    1816           60 :     DeleteSharedComments(db_id, DatabaseRelationId);
    1817           60 :     DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
    1818              : 
    1819              :     /*
    1820              :      * Remove settings associated with this database
    1821              :      */
    1822           60 :     DropSetting(db_id, InvalidOid);
    1823              : 
    1824              :     /*
    1825              :      * Remove shared dependency references for the database.
    1826              :      */
    1827           60 :     dropDatabaseDependencies(db_id);
    1828              : 
    1829              :     /*
    1830              :      * Tell the cumulative stats system to forget it immediately, too.
    1831              :      */
    1832           60 :     pgstat_drop_database(db_id);
    1833              : 
    1834              :     /*
    1835              :      * Except for the deletion of the catalog row, subsequent actions are not
    1836              :      * transactional (consider DropDatabaseBuffers() discarding modified
    1837              :      * buffers). But we might crash or get interrupted below. To prevent
    1838              :      * accesses to a database with invalid contents, mark the database as
    1839              :      * invalid using an in-place update.
    1840              :      *
    1841              :      * We need to flush the WAL before continuing, to guarantee the
    1842              :      * modification is durable before performing irreversible filesystem
    1843              :      * operations.
    1844              :      */
    1845           60 :     ScanKeyInit(&scankey,
    1846              :                 Anum_pg_database_datname,
    1847              :                 BTEqualStrategyNumber, F_NAMEEQ,
    1848              :                 CStringGetDatum(dbname));
    1849           60 :     systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
    1850              :                                   NULL, 1, &scankey, &tup, &inplace_state);
    1851           60 :     if (!HeapTupleIsValid(tup))
    1852            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1853           60 :     datform = (Form_pg_database) GETSTRUCT(tup);
    1854           60 :     datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
    1855           60 :     systable_inplace_update_finish(inplace_state, tup);
    1856           60 :     XLogFlush(XactLastRecEnd);
    1857              : 
    1858              :     /*
    1859              :      * Also delete the tuple - transactionally. If this transaction commits,
    1860              :      * the row will be gone, but if we fail, dropdb() can be invoked again.
    1861              :      */
    1862           60 :     CatalogTupleDelete(pgdbrel, &tup->t_self);
    1863           60 :     heap_freetuple(tup);
    1864              : 
    1865              :     /*
    1866              :      * Drop db-specific replication slots.
    1867              :      */
    1868           60 :     ReplicationSlotsDropDBSlots(db_id);
    1869              : 
    1870              :     /*
    1871              :      * Drop pages for this database that are in the shared buffer cache. This
    1872              :      * is important to ensure that no remaining backend tries to write out a
    1873              :      * dirty buffer to the dead database later...
    1874              :      */
    1875           60 :     DropDatabaseBuffers(db_id);
    1876              : 
    1877              :     /*
    1878              :      * Tell checkpointer to forget any pending fsync and unlink requests for
    1879              :      * files in the database; else the fsyncs will fail at next checkpoint, or
    1880              :      * worse, it will delete files that belong to a newly created database
    1881              :      * with the same OID.
    1882              :      */
    1883           60 :     ForgetDatabaseSyncRequests(db_id);
    1884              : 
    1885              :     /*
    1886              :      * Force a checkpoint to make sure the checkpointer has received the
    1887              :      * message sent by ForgetDatabaseSyncRequests.
    1888              :      */
    1889           60 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    1890              : 
    1891              :     /* Close all smgr fds in all backends. */
    1892           60 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    1893              : 
    1894              :     /*
    1895              :      * Remove all tablespace subdirs belonging to the database.
    1896              :      */
    1897           60 :     remove_dbtablespaces(db_id);
    1898              : 
    1899              :     /*
    1900              :      * Close pg_database, but keep lock till commit.
    1901              :      */
    1902           59 :     table_close(pgdbrel, NoLock);
    1903              : 
    1904              :     /*
    1905              :      * Force synchronous commit, thus minimizing the window between removal of
    1906              :      * the database files and committal of the transaction. If we crash before
    1907              :      * committing, we'll have a DB that's gone on disk but still there
    1908              :      * according to pg_database, which is not good.
    1909              :      */
    1910           59 :     ForceSyncCommit();
    1911              : }
    1912              : 
    1913              : 
    1914              : /*
    1915              :  * Rename database
    1916              :  */
    1917              : ObjectAddress
    1918            9 : RenameDatabase(const char *oldname, const char *newname)
    1919              : {
    1920              :     Oid         db_id;
    1921              :     HeapTuple   newtup;
    1922              :     ItemPointerData otid;
    1923              :     Relation    rel;
    1924              :     int         notherbackends;
    1925              :     int         npreparedxacts;
    1926              :     ObjectAddress address;
    1927              : 
    1928              :     /* Report error if name has \n or \r character. */
    1929            9 :     if (strpbrk(newname, "\n\r"))
    1930            0 :         ereport(ERROR,
    1931              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1932              :                  errmsg("database name \"%s\" contains a newline or carriage return character", newname)));
    1933              : 
    1934              :     /*
    1935              :      * Look up the target database's OID, and get exclusive lock on it. We
    1936              :      * need this for the same reasons as DROP DATABASE.
    1937              :      */
    1938            9 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1939              : 
    1940            9 :     if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    1941              :                      NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1942            0 :         ereport(ERROR,
    1943              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1944              :                  errmsg("database \"%s\" does not exist", oldname)));
    1945              : 
    1946              :     /* must be owner */
    1947            9 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1948            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1949              :                        oldname);
    1950              : 
    1951              :     /* must have createdb rights */
    1952            9 :     if (!have_createdb_privilege())
    1953            0 :         ereport(ERROR,
    1954              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1955              :                  errmsg("permission denied to rename database")));
    1956              : 
    1957              :     /*
    1958              :      * If built with appropriate switch, whine when regression-testing
    1959              :      * conventions for database names are violated.
    1960              :      */
    1961              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1962              :     if (strstr(newname, "regression") == NULL)
    1963              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1964              : #endif
    1965              : 
    1966              :     /*
    1967              :      * Make sure the new name doesn't exist.  See notes for same error in
    1968              :      * CREATE DATABASE.
    1969              :      */
    1970            9 :     if (OidIsValid(get_database_oid(newname, true)))
    1971            0 :         ereport(ERROR,
    1972              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1973              :                  errmsg("database \"%s\" already exists", newname)));
    1974              : 
    1975              :     /*
    1976              :      * XXX Client applications probably store the current database somewhere,
    1977              :      * so renaming it could cause confusion.  On the other hand, there may not
    1978              :      * be an actual problem besides a little confusion, so think about this
    1979              :      * and decide.
    1980              :      */
    1981            9 :     if (db_id == MyDatabaseId)
    1982            0 :         ereport(ERROR,
    1983              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1984              :                  errmsg("current database cannot be renamed")));
    1985              : 
    1986              :     /*
    1987              :      * Make sure the database does not have active sessions.  This is the same
    1988              :      * concern as above, but applied to other sessions.
    1989              :      *
    1990              :      * As in CREATE DATABASE, check this after other error conditions.
    1991              :      */
    1992            9 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1993            0 :         ereport(ERROR,
    1994              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1995              :                  errmsg("database \"%s\" is being accessed by other users",
    1996              :                         oldname),
    1997              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1998              : 
    1999              :     /* rename */
    2000            9 :     newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    2001            9 :     if (!HeapTupleIsValid(newtup))
    2002            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    2003            9 :     otid = newtup->t_self;
    2004            9 :     namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
    2005            9 :     CatalogTupleUpdate(rel, &otid, newtup);
    2006            9 :     UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
    2007              : 
    2008            9 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2009              : 
    2010            9 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2011              : 
    2012              :     /*
    2013              :      * Close pg_database, but keep lock till commit.
    2014              :      */
    2015            9 :     table_close(rel, NoLock);
    2016              : 
    2017            9 :     return address;
    2018              : }
    2019              : 
    2020              : 
    2021              : /*
    2022              :  * ALTER DATABASE SET TABLESPACE
    2023              :  */
    2024              : static void
    2025           14 : movedb(const char *dbname, const char *tblspcname)
    2026              : {
    2027              :     Oid         db_id;
    2028              :     Relation    pgdbrel;
    2029              :     int         notherbackends;
    2030              :     int         npreparedxacts;
    2031              :     HeapTuple   oldtuple,
    2032              :                 newtuple;
    2033              :     Oid         src_tblspcoid,
    2034              :                 dst_tblspcoid;
    2035              :     ScanKeyData scankey;
    2036              :     SysScanDesc sysscan;
    2037              :     AclResult   aclresult;
    2038              :     char       *src_dbpath;
    2039              :     char       *dst_dbpath;
    2040              :     DIR        *dstdir;
    2041              :     struct dirent *xlde;
    2042              :     movedb_failure_params fparms;
    2043              : 
    2044              :     /*
    2045              :      * Look up the target database's OID, and get exclusive lock on it. We
    2046              :      * need this to ensure that no new backend starts up in the database while
    2047              :      * we are moving it, and that no one is using it as a CREATE DATABASE
    2048              :      * template or trying to delete it.
    2049              :      */
    2050           14 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    2051              : 
    2052           14 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    2053              :                      NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
    2054            0 :         ereport(ERROR,
    2055              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2056              :                  errmsg("database \"%s\" does not exist", dbname)));
    2057              : 
    2058              :     /*
    2059              :      * We actually need a session lock, so that the lock will persist across
    2060              :      * the commit/restart below.  (We could almost get away with letting the
    2061              :      * lock be released at commit, except that someone could try to move
    2062              :      * relations of the DB back into the old directory while we rmtree() it.)
    2063              :      */
    2064           14 :     LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2065              :                                AccessExclusiveLock);
    2066              : 
    2067              :     /*
    2068              :      * Permission checks
    2069              :      */
    2070           14 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2071            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2072              :                        dbname);
    2073              : 
    2074              :     /*
    2075              :      * Obviously can't move the tables of my own database
    2076              :      */
    2077           14 :     if (db_id == MyDatabaseId)
    2078            0 :         ereport(ERROR,
    2079              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2080              :                  errmsg("cannot change the tablespace of the currently open database")));
    2081              : 
    2082              :     /*
    2083              :      * Get tablespace's oid
    2084              :      */
    2085           14 :     dst_tblspcoid = get_tablespace_oid(tblspcname, false);
    2086              : 
    2087              :     /*
    2088              :      * Permission checks
    2089              :      */
    2090           14 :     aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
    2091              :                                 ACL_CREATE);
    2092           14 :     if (aclresult != ACLCHECK_OK)
    2093            0 :         aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2094              :                        tblspcname);
    2095              : 
    2096              :     /*
    2097              :      * pg_global must never be the default tablespace
    2098              :      */
    2099           14 :     if (dst_tblspcoid == GLOBALTABLESPACE_OID)
    2100            0 :         ereport(ERROR,
    2101              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2102              :                  errmsg("pg_global cannot be used as default tablespace")));
    2103              : 
    2104              :     /*
    2105              :      * No-op if same tablespace
    2106              :      */
    2107           14 :     if (src_tblspcoid == dst_tblspcoid)
    2108              :     {
    2109            0 :         table_close(pgdbrel, NoLock);
    2110            0 :         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2111              :                                      AccessExclusiveLock);
    2112            0 :         return;
    2113              :     }
    2114              : 
    2115              :     /*
    2116              :      * Check for other backends in the target database.  (Because we hold the
    2117              :      * database lock, no new ones can start after this.)
    2118              :      *
    2119              :      * As in CREATE DATABASE, check this after other error conditions.
    2120              :      */
    2121           14 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    2122            0 :         ereport(ERROR,
    2123              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2124              :                  errmsg("database \"%s\" is being accessed by other users",
    2125              :                         dbname),
    2126              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    2127              : 
    2128              :     /*
    2129              :      * Get old and new database paths
    2130              :      */
    2131           14 :     src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
    2132           14 :     dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
    2133              : 
    2134              :     /*
    2135              :      * Force a checkpoint before proceeding. This will force all dirty
    2136              :      * buffers, including those of unlogged tables, out to disk, to ensure
    2137              :      * source database is up-to-date on disk for the copy.
    2138              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
    2139              :      * process any pending unlink requests. Otherwise, the check for existing
    2140              :      * files in the target directory might fail unnecessarily, not to mention
    2141              :      * that the copy might fail due to source files getting deleted under it.
    2142              :      * On Windows, this also ensures that background procs don't hold any open
    2143              :      * files, which would cause rmdir() to fail.
    2144              :      */
    2145           14 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
    2146              :                       | CHECKPOINT_FLUSH_UNLOGGED);
    2147              : 
    2148              :     /* Close all smgr fds in all backends. */
    2149           14 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    2150              : 
    2151              :     /*
    2152              :      * Now drop all buffers holding data of the target database; they should
    2153              :      * no longer be dirty so DropDatabaseBuffers is safe.
    2154              :      *
    2155              :      * It might seem that we could just let these buffers age out of shared
    2156              :      * buffers naturally, since they should not get referenced anymore.  The
    2157              :      * problem with that is that if the user later moves the database back to
    2158              :      * its original tablespace, any still-surviving buffers would appear to
    2159              :      * contain valid data again --- but they'd be missing any changes made in
    2160              :      * the database while it was in the new tablespace.  In any case, freeing
    2161              :      * buffers that should never be used again seems worth the cycles.
    2162              :      *
    2163              :      * Note: it'd be sufficient to get rid of buffers matching db_id and
    2164              :      * src_tblspcoid, but bufmgr.c presently provides no API for that.
    2165              :      */
    2166           14 :     DropDatabaseBuffers(db_id);
    2167              : 
    2168              :     /*
    2169              :      * Check for existence of files in the target directory, i.e., objects of
    2170              :      * this database that are already in the target tablespace.  We can't
    2171              :      * allow the move in such a case, because we would need to change those
    2172              :      * relations' pg_class.reltablespace entries to zero, and we don't have
    2173              :      * access to the DB's pg_class to do so.
    2174              :      */
    2175           14 :     dstdir = AllocateDir(dst_dbpath);
    2176           14 :     if (dstdir != NULL)
    2177              :     {
    2178            0 :         while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
    2179              :         {
    2180            0 :             if (strcmp(xlde->d_name, ".") == 0 ||
    2181            0 :                 strcmp(xlde->d_name, "..") == 0)
    2182            0 :                 continue;
    2183              : 
    2184            0 :             ereport(ERROR,
    2185              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2186              :                      errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
    2187              :                             dbname, tblspcname),
    2188              :                      errhint("You must move them back to the database's default tablespace before using this command.")));
    2189              :         }
    2190              : 
    2191            0 :         FreeDir(dstdir);
    2192              : 
    2193              :         /*
    2194              :          * The directory exists but is empty. We must remove it before using
    2195              :          * the copydir function.
    2196              :          */
    2197            0 :         if (rmdir(dst_dbpath) != 0)
    2198            0 :             elog(ERROR, "could not remove directory \"%s\": %m",
    2199              :                  dst_dbpath);
    2200              :     }
    2201              : 
    2202              :     /*
    2203              :      * Use an ENSURE block to make sure we remove the debris if the copy fails
    2204              :      * (eg, due to out-of-disk-space).  This is not a 100% solution, because
    2205              :      * of the possibility of failure during transaction commit, but it should
    2206              :      * handle most scenarios.
    2207              :      */
    2208           14 :     fparms.dest_dboid = db_id;
    2209           14 :     fparms.dest_tsoid = dst_tblspcoid;
    2210           14 :     PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2211              :                             PointerGetDatum(&fparms));
    2212              :     {
    2213           14 :         Datum       new_record[Natts_pg_database] = {0};
    2214           14 :         bool        new_record_nulls[Natts_pg_database] = {0};
    2215           14 :         bool        new_record_repl[Natts_pg_database] = {0};
    2216              : 
    2217              :         /*
    2218              :          * Copy files from the old tablespace to the new one
    2219              :          */
    2220           14 :         copydir(src_dbpath, dst_dbpath, false);
    2221              : 
    2222              :         /*
    2223              :          * Record the filesystem change in XLOG
    2224              :          */
    2225              :         {
    2226              :             xl_dbase_create_file_copy_rec xlrec;
    2227              : 
    2228           14 :             xlrec.db_id = db_id;
    2229           14 :             xlrec.tablespace_id = dst_tblspcoid;
    2230           14 :             xlrec.src_db_id = db_id;
    2231           14 :             xlrec.src_tablespace_id = src_tblspcoid;
    2232              : 
    2233           14 :             XLogBeginInsert();
    2234           14 :             XLogRegisterData(&xlrec,
    2235              :                              sizeof(xl_dbase_create_file_copy_rec));
    2236              : 
    2237           14 :             (void) XLogInsert(RM_DBASE_ID,
    2238              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
    2239              :         }
    2240              : 
    2241              :         /*
    2242              :          * Update the database's pg_database tuple
    2243              :          */
    2244           14 :         ScanKeyInit(&scankey,
    2245              :                     Anum_pg_database_datname,
    2246              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2247              :                     CStringGetDatum(dbname));
    2248           14 :         sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
    2249              :                                      NULL, 1, &scankey);
    2250           14 :         oldtuple = systable_getnext(sysscan);
    2251           14 :         if (!HeapTupleIsValid(oldtuple))    /* shouldn't happen... */
    2252            0 :             ereport(ERROR,
    2253              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    2254              :                      errmsg("database \"%s\" does not exist", dbname)));
    2255           14 :         LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2256              : 
    2257           14 :         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
    2258           14 :         new_record_repl[Anum_pg_database_dattablespace - 1] = true;
    2259              : 
    2260           14 :         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
    2261              :                                      new_record,
    2262              :                                      new_record_nulls, new_record_repl);
    2263           14 :         CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
    2264           14 :         UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2265              : 
    2266           14 :         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2267              : 
    2268           14 :         systable_endscan(sysscan);
    2269              : 
    2270              :         /*
    2271              :          * Force another checkpoint here.  As in CREATE DATABASE, this is to
    2272              :          * ensure that we don't have to replay a committed
    2273              :          * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
    2274              :          * any unlogged operations done in the new DB tablespace before the
    2275              :          * next checkpoint.
    2276              :          */
    2277           14 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    2278              : 
    2279              :         /*
    2280              :          * Force synchronous commit, thus minimizing the window between
    2281              :          * copying the database files and committal of the transaction. If we
    2282              :          * crash before committing, we'll leave an orphaned set of files on
    2283              :          * disk, which is not fatal but not good either.
    2284              :          */
    2285           14 :         ForceSyncCommit();
    2286              : 
    2287              :         /*
    2288              :          * Close pg_database, but keep lock till commit.
    2289              :          */
    2290           14 :         table_close(pgdbrel, NoLock);
    2291              :     }
    2292           14 :     PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2293              :                                 PointerGetDatum(&fparms));
    2294              : 
    2295              :     /*
    2296              :      * Commit the transaction so that the pg_database update is committed. If
    2297              :      * we crash while removing files, the database won't be corrupt, we'll
    2298              :      * just leave some orphaned files in the old directory.
    2299              :      *
    2300              :      * (This is OK because we know we aren't inside a transaction block.)
    2301              :      *
    2302              :      * XXX would it be safe/better to do this inside the ensure block?  Not
    2303              :      * convinced it's a good idea; consider elog just after the transaction
    2304              :      * really commits.
    2305              :      */
    2306           14 :     PopActiveSnapshot();
    2307           14 :     CommitTransactionCommand();
    2308              : 
    2309              :     /* Start new transaction for the remaining work; don't need a snapshot */
    2310           14 :     StartTransactionCommand();
    2311              : 
    2312              :     /*
    2313              :      * Remove files from the old tablespace
    2314              :      */
    2315           14 :     if (!rmtree(src_dbpath, true))
    2316            0 :         ereport(WARNING,
    2317              :                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2318              :                         src_dbpath)));
    2319              : 
    2320              :     /*
    2321              :      * Record the filesystem change in XLOG
    2322              :      */
    2323              :     {
    2324              :         xl_dbase_drop_rec xlrec;
    2325              : 
    2326           14 :         xlrec.db_id = db_id;
    2327           14 :         xlrec.ntablespaces = 1;
    2328              : 
    2329           14 :         XLogBeginInsert();
    2330           14 :         XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
    2331           14 :         XLogRegisterData(&src_tblspcoid, sizeof(Oid));
    2332              : 
    2333           14 :         (void) XLogInsert(RM_DBASE_ID,
    2334              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2335              :     }
    2336              : 
    2337              :     /* Now it's safe to release the database lock */
    2338           14 :     UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2339              :                                  AccessExclusiveLock);
    2340              : 
    2341           14 :     pfree(src_dbpath);
    2342           14 :     pfree(dst_dbpath);
    2343              : }
    2344              : 
    2345              : /* Error cleanup callback for movedb */
    2346              : static void
    2347            0 : movedb_failure_callback(int code, Datum arg)
    2348              : {
    2349            0 :     movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
    2350              :     char       *dstpath;
    2351              : 
    2352              :     /* Get rid of anything we managed to copy to the target directory */
    2353            0 :     dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
    2354              : 
    2355            0 :     (void) rmtree(dstpath, true);
    2356              : 
    2357            0 :     pfree(dstpath);
    2358            0 : }
    2359              : 
    2360              : /*
    2361              :  * Process options and call dropdb function.
    2362              :  */
    2363              : void
    2364           82 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
    2365              : {
    2366           82 :     bool        force = false;
    2367              :     ListCell   *lc;
    2368              : 
    2369           99 :     foreach(lc, stmt->options)
    2370              :     {
    2371           17 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2372              : 
    2373           17 :         if (strcmp(opt->defname, "force") == 0)
    2374           17 :             force = true;
    2375              :         else
    2376            0 :             ereport(ERROR,
    2377              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2378              :                      errmsg("unrecognized %s option \"%s\"",
    2379              :                             "DROP DATABASE", opt->defname),
    2380              :                      parser_errposition(pstate, opt->location)));
    2381              :     }
    2382              : 
    2383           82 :     dropdb(stmt->dbname, stmt->missing_ok, force);
    2384           70 : }
    2385              : 
    2386              : /*
    2387              :  * ALTER DATABASE name ...
    2388              :  */
    2389              : Oid
    2390           56 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    2391              : {
    2392              :     Relation    rel;
    2393              :     Oid         dboid;
    2394              :     HeapTuple   tuple,
    2395              :                 newtuple;
    2396              :     Form_pg_database datform;
    2397              :     ScanKeyData scankey;
    2398              :     SysScanDesc scan;
    2399              :     ListCell   *option;
    2400           56 :     bool        dbistemplate = false;
    2401           56 :     bool        dballowconnections = true;
    2402           56 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
    2403           56 :     DefElem    *distemplate = NULL;
    2404           56 :     DefElem    *dallowconnections = NULL;
    2405           56 :     DefElem    *dconnlimit = NULL;
    2406           56 :     DefElem    *dtablespace = NULL;
    2407           56 :     Datum       new_record[Natts_pg_database] = {0};
    2408           56 :     bool        new_record_nulls[Natts_pg_database] = {0};
    2409           56 :     bool        new_record_repl[Natts_pg_database] = {0};
    2410              : 
    2411              :     /* Extract options from the statement node tree */
    2412          112 :     foreach(option, stmt->options)
    2413              :     {
    2414           56 :         DefElem    *defel = (DefElem *) lfirst(option);
    2415              : 
    2416           56 :         if (strcmp(defel->defname, "is_template") == 0)
    2417              :         {
    2418           12 :             if (distemplate)
    2419            0 :                 errorConflictingDefElem(defel, pstate);
    2420           12 :             distemplate = defel;
    2421              :         }
    2422           44 :         else if (strcmp(defel->defname, "allow_connections") == 0)
    2423              :         {
    2424           21 :             if (dallowconnections)
    2425            0 :                 errorConflictingDefElem(defel, pstate);
    2426           21 :             dallowconnections = defel;
    2427              :         }
    2428           23 :         else if (strcmp(defel->defname, "connection_limit") == 0)
    2429              :         {
    2430            9 :             if (dconnlimit)
    2431            0 :                 errorConflictingDefElem(defel, pstate);
    2432            9 :             dconnlimit = defel;
    2433              :         }
    2434           14 :         else if (strcmp(defel->defname, "tablespace") == 0)
    2435              :         {
    2436           14 :             if (dtablespace)
    2437            0 :                 errorConflictingDefElem(defel, pstate);
    2438           14 :             dtablespace = defel;
    2439              :         }
    2440              :         else
    2441            0 :             ereport(ERROR,
    2442              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2443              :                      errmsg("option \"%s\" not recognized", defel->defname),
    2444              :                      parser_errposition(pstate, defel->location)));
    2445              :     }
    2446              : 
    2447           56 :     if (dtablespace)
    2448              :     {
    2449              :         /*
    2450              :          * While the SET TABLESPACE syntax doesn't allow any other options,
    2451              :          * somebody could write "WITH TABLESPACE ...".  Forbid any other
    2452              :          * options from being specified in that case.
    2453              :          */
    2454           14 :         if (list_length(stmt->options) != 1)
    2455            0 :             ereport(ERROR,
    2456              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2457              :                      errmsg("option \"%s\" cannot be specified with other options",
    2458              :                             dtablespace->defname),
    2459              :                      parser_errposition(pstate, dtablespace->location)));
    2460              :         /* this case isn't allowed within a transaction block */
    2461           14 :         PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
    2462           14 :         movedb(stmt->dbname, defGetString(dtablespace));
    2463           14 :         return InvalidOid;
    2464              :     }
    2465              : 
    2466           42 :     if (distemplate && distemplate->arg)
    2467           12 :         dbistemplate = defGetBoolean(distemplate);
    2468           42 :     if (dallowconnections && dallowconnections->arg)
    2469           21 :         dballowconnections = defGetBoolean(dallowconnections);
    2470           42 :     if (dconnlimit && dconnlimit->arg)
    2471              :     {
    2472            9 :         dbconnlimit = defGetInt32(dconnlimit);
    2473            9 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
    2474            0 :             ereport(ERROR,
    2475              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2476              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
    2477              :     }
    2478              : 
    2479              :     /*
    2480              :      * Get the old tuple.  We don't need a lock on the database per se,
    2481              :      * because we're not going to do anything that would mess up incoming
    2482              :      * connections.
    2483              :      */
    2484           42 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2485           42 :     ScanKeyInit(&scankey,
    2486              :                 Anum_pg_database_datname,
    2487              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2488           42 :                 CStringGetDatum(stmt->dbname));
    2489           42 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2490              :                               NULL, 1, &scankey);
    2491           42 :     tuple = systable_getnext(scan);
    2492           42 :     if (!HeapTupleIsValid(tuple))
    2493            0 :         ereport(ERROR,
    2494              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2495              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2496           42 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2497              : 
    2498           42 :     datform = (Form_pg_database) GETSTRUCT(tuple);
    2499           42 :     dboid = datform->oid;
    2500              : 
    2501           42 :     if (database_is_invalid_form(datform))
    2502              :     {
    2503            1 :         ereport(FATAL,
    2504              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2505              :                 errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
    2506              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    2507              :     }
    2508              : 
    2509           41 :     if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
    2510            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2511            0 :                        stmt->dbname);
    2512              : 
    2513              :     /*
    2514              :      * In order to avoid getting locked out and having to go through
    2515              :      * standalone mode, we refuse to disallow connections to the database
    2516              :      * we're currently connected to.  Lockout can still happen with concurrent
    2517              :      * sessions but the likeliness of that is not high enough to worry about.
    2518              :      */
    2519           41 :     if (!dballowconnections && dboid == MyDatabaseId)
    2520            0 :         ereport(ERROR,
    2521              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2522              :                  errmsg("cannot disallow connections for current database")));
    2523              : 
    2524              :     /*
    2525              :      * Build an updated tuple, perusing the information just obtained
    2526              :      */
    2527           41 :     if (distemplate)
    2528              :     {
    2529           12 :         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    2530           12 :         new_record_repl[Anum_pg_database_datistemplate - 1] = true;
    2531              :     }
    2532           41 :     if (dallowconnections)
    2533              :     {
    2534           21 :         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    2535           21 :         new_record_repl[Anum_pg_database_datallowconn - 1] = true;
    2536              :     }
    2537           41 :     if (dconnlimit)
    2538              :     {
    2539            8 :         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    2540            8 :         new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    2541              :     }
    2542              : 
    2543           41 :     newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
    2544              :                                  new_record_nulls, new_record_repl);
    2545           41 :     CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2546           41 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2547              : 
    2548           41 :     InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
    2549              : 
    2550           41 :     systable_endscan(scan);
    2551              : 
    2552              :     /* Close pg_database, but keep lock till commit */
    2553           41 :     table_close(rel, NoLock);
    2554              : 
    2555           41 :     return dboid;
    2556              : }
    2557              : 
    2558              : 
    2559              : /*
    2560              :  * ALTER DATABASE name REFRESH COLLATION VERSION
    2561              :  */
    2562              : ObjectAddress
    2563            4 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    2564              : {
    2565              :     Relation    rel;
    2566              :     ScanKeyData scankey;
    2567              :     SysScanDesc scan;
    2568              :     Oid         db_id;
    2569              :     HeapTuple   tuple;
    2570              :     Form_pg_database datForm;
    2571              :     ObjectAddress address;
    2572              :     Datum       datum;
    2573              :     bool        isnull;
    2574              :     char       *oldversion;
    2575              :     char       *newversion;
    2576              : 
    2577            4 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2578            4 :     ScanKeyInit(&scankey,
    2579              :                 Anum_pg_database_datname,
    2580              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2581            4 :                 CStringGetDatum(stmt->dbname));
    2582            4 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2583              :                               NULL, 1, &scankey);
    2584            4 :     tuple = systable_getnext(scan);
    2585            4 :     if (!HeapTupleIsValid(tuple))
    2586            0 :         ereport(ERROR,
    2587              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2588              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2589              : 
    2590            4 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2591            4 :     db_id = datForm->oid;
    2592              : 
    2593            4 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2594            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2595            0 :                        stmt->dbname);
    2596            4 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2597              : 
    2598            4 :     datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    2599            4 :     oldversion = isnull ? NULL : TextDatumGetCString(datum);
    2600              : 
    2601            4 :     if (datForm->datlocprovider == COLLPROVIDER_LIBC)
    2602              :     {
    2603            3 :         datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
    2604            3 :         if (isnull)
    2605            0 :             elog(ERROR, "unexpected null in pg_database");
    2606              :     }
    2607              :     else
    2608              :     {
    2609            1 :         datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
    2610            1 :         if (isnull)
    2611            0 :             elog(ERROR, "unexpected null in pg_database");
    2612              :     }
    2613              : 
    2614            4 :     newversion = get_collation_actual_version(datForm->datlocprovider,
    2615            4 :                                               TextDatumGetCString(datum));
    2616              : 
    2617              :     /* cannot change from NULL to non-NULL or vice versa */
    2618            4 :     if ((!oldversion && newversion) || (oldversion && !newversion))
    2619            0 :         elog(ERROR, "invalid collation version change");
    2620            4 :     else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
    2621            0 :     {
    2622            0 :         bool        nulls[Natts_pg_database] = {0};
    2623            0 :         bool        replaces[Natts_pg_database] = {0};
    2624            0 :         Datum       values[Natts_pg_database] = {0};
    2625              :         HeapTuple   newtuple;
    2626              : 
    2627            0 :         ereport(NOTICE,
    2628              :                 (errmsg("changing version from %s to %s",
    2629              :                         oldversion, newversion)));
    2630              : 
    2631            0 :         values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
    2632            0 :         replaces[Anum_pg_database_datcollversion - 1] = true;
    2633              : 
    2634            0 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
    2635              :                                      values, nulls, replaces);
    2636            0 :         CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2637            0 :         heap_freetuple(newtuple);
    2638              :     }
    2639              :     else
    2640            4 :         ereport(NOTICE,
    2641              :                 (errmsg("version has not changed")));
    2642            4 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2643              : 
    2644            4 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2645              : 
    2646            4 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2647              : 
    2648            4 :     systable_endscan(scan);
    2649              : 
    2650            4 :     table_close(rel, NoLock);
    2651              : 
    2652            4 :     return address;
    2653              : }
    2654              : 
    2655              : 
    2656              : /*
    2657              :  * ALTER DATABASE name SET ...
    2658              :  */
    2659              : Oid
    2660          667 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
    2661              : {
    2662          667 :     Oid         datid = get_database_oid(stmt->dbname, false);
    2663              : 
    2664              :     /*
    2665              :      * Obtain a lock on the database and make sure it didn't go away in the
    2666              :      * meantime.
    2667              :      */
    2668          667 :     shdepLockAndCheckObject(DatabaseRelationId, datid);
    2669              : 
    2670          667 :     if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
    2671            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2672            0 :                        stmt->dbname);
    2673              : 
    2674          667 :     AlterSetting(datid, InvalidOid, stmt->setstmt);
    2675              : 
    2676          665 :     UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
    2677              : 
    2678          665 :     return datid;
    2679              : }
    2680              : 
    2681              : 
    2682              : /*
    2683              :  * ALTER DATABASE name OWNER TO newowner
    2684              :  */
    2685              : ObjectAddress
    2686           49 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
    2687              : {
    2688              :     Oid         db_id;
    2689              :     HeapTuple   tuple;
    2690              :     Relation    rel;
    2691              :     ScanKeyData scankey;
    2692              :     SysScanDesc scan;
    2693              :     Form_pg_database datForm;
    2694              :     ObjectAddress address;
    2695              : 
    2696              :     /*
    2697              :      * Get the old tuple.  We don't need a lock on the database per se,
    2698              :      * because we're not going to do anything that would mess up incoming
    2699              :      * connections.
    2700              :      */
    2701           49 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2702           49 :     ScanKeyInit(&scankey,
    2703              :                 Anum_pg_database_datname,
    2704              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2705              :                 CStringGetDatum(dbname));
    2706           49 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2707              :                               NULL, 1, &scankey);
    2708           49 :     tuple = systable_getnext(scan);
    2709           49 :     if (!HeapTupleIsValid(tuple))
    2710            0 :         ereport(ERROR,
    2711              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2712              :                  errmsg("database \"%s\" does not exist", dbname)));
    2713              : 
    2714           49 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2715           49 :     db_id = datForm->oid;
    2716              : 
    2717              :     /*
    2718              :      * If the new owner is the same as the existing owner, consider the
    2719              :      * command to have succeeded.  This is to be consistent with other
    2720              :      * objects.
    2721              :      */
    2722           49 :     if (datForm->datdba != newOwnerId)
    2723              :     {
    2724              :         Datum       repl_val[Natts_pg_database];
    2725           17 :         bool        repl_null[Natts_pg_database] = {0};
    2726           17 :         bool        repl_repl[Natts_pg_database] = {0};
    2727              :         Acl        *newAcl;
    2728              :         Datum       aclDatum;
    2729              :         bool        isNull;
    2730              :         HeapTuple   newtuple;
    2731              : 
    2732              :         /* Otherwise, must be owner of the existing object */
    2733           17 :         if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2734            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2735              :                            dbname);
    2736              : 
    2737              :         /* Must be able to become new owner */
    2738           17 :         check_can_set_role(GetUserId(), newOwnerId);
    2739              : 
    2740              :         /*
    2741              :          * must have createdb rights
    2742              :          *
    2743              :          * NOTE: This is different from other alter-owner checks in that the
    2744              :          * current user is checked for createdb privileges instead of the
    2745              :          * destination owner.  This is consistent with the CREATE case for
    2746              :          * databases.  Because superusers will always have this right, we need
    2747              :          * no special case for them.
    2748              :          */
    2749           17 :         if (!have_createdb_privilege())
    2750            0 :             ereport(ERROR,
    2751              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2752              :                      errmsg("permission denied to change owner of database")));
    2753              : 
    2754           17 :         LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2755              : 
    2756           17 :         repl_repl[Anum_pg_database_datdba - 1] = true;
    2757           17 :         repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
    2758              : 
    2759              :         /*
    2760              :          * Determine the modified ACL for the new owner.  This is only
    2761              :          * necessary when the ACL is non-null.
    2762              :          */
    2763           17 :         aclDatum = heap_getattr(tuple,
    2764              :                                 Anum_pg_database_datacl,
    2765              :                                 RelationGetDescr(rel),
    2766              :                                 &isNull);
    2767           17 :         if (!isNull)
    2768              :         {
    2769            0 :             newAcl = aclnewowner(DatumGetAclP(aclDatum),
    2770              :                                  datForm->datdba, newOwnerId);
    2771            0 :             repl_repl[Anum_pg_database_datacl - 1] = true;
    2772            0 :             repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
    2773              :         }
    2774              : 
    2775           17 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
    2776           17 :         CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
    2777           17 :         UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2778              : 
    2779           17 :         heap_freetuple(newtuple);
    2780              : 
    2781              :         /* Update owner dependency reference */
    2782           17 :         changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
    2783              :     }
    2784              : 
    2785           49 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2786              : 
    2787           49 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2788              : 
    2789           49 :     systable_endscan(scan);
    2790              : 
    2791              :     /* Close pg_database, but keep lock till commit */
    2792           49 :     table_close(rel, NoLock);
    2793              : 
    2794           49 :     return address;
    2795              : }
    2796              : 
    2797              : 
    2798              : Datum
    2799           55 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
    2800              : {
    2801           55 :     Oid         dbid = PG_GETARG_OID(0);
    2802              :     HeapTuple   tp;
    2803              :     char        datlocprovider;
    2804              :     Datum       datum;
    2805              :     char       *version;
    2806              : 
    2807           55 :     tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    2808           55 :     if (!HeapTupleIsValid(tp))
    2809            0 :         ereport(ERROR,
    2810              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2811              :                  errmsg("database with OID %u does not exist", dbid)));
    2812              : 
    2813           55 :     datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
    2814              : 
    2815           55 :     if (datlocprovider == COLLPROVIDER_LIBC)
    2816           48 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
    2817              :     else
    2818            7 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
    2819              : 
    2820           55 :     version = get_collation_actual_version(datlocprovider,
    2821           55 :                                            TextDatumGetCString(datum));
    2822              : 
    2823           55 :     ReleaseSysCache(tp);
    2824              : 
    2825           55 :     if (version)
    2826           41 :         PG_RETURN_TEXT_P(cstring_to_text(version));
    2827              :     else
    2828           14 :         PG_RETURN_NULL();
    2829              : }
    2830              : 
    2831              : 
    2832              : /*
    2833              :  * Helper functions
    2834              :  */
    2835              : 
    2836              : /*
    2837              :  * Look up info about the database named "name".  If the database exists,
    2838              :  * obtain the specified lock type on it, fill in any of the remaining
    2839              :  * parameters that aren't NULL, and return true.  If no such database,
    2840              :  * return false.
    2841              :  */
    2842              : static bool
    2843          551 : get_db_info(const char *name, LOCKMODE lockmode,
    2844              :             Oid *dbIdP, Oid *ownerIdP,
    2845              :             int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
    2846              :             TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
    2847              :             Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
    2848              :             char **dbIcurules,
    2849              :             char *dbLocProvider,
    2850              :             char **dbCollversion)
    2851              : {
    2852          551 :     bool        result = false;
    2853              :     Relation    relation;
    2854              : 
    2855              :     Assert(name);
    2856              : 
    2857              :     /* Caller may wish to grab a better lock on pg_database beforehand... */
    2858          551 :     relation = table_open(DatabaseRelationId, AccessShareLock);
    2859              : 
    2860              :     /*
    2861              :      * Loop covers the rare case where the database is renamed before we can
    2862              :      * lock it.  We try again just in case we can find a new one of the same
    2863              :      * name.
    2864              :      */
    2865              :     for (;;)
    2866            0 :     {
    2867              :         ScanKeyData scanKey;
    2868              :         SysScanDesc scan;
    2869              :         HeapTuple   tuple;
    2870              :         Oid         dbOid;
    2871              : 
    2872              :         /*
    2873              :          * there's no syscache for database-indexed-by-name, so must do it the
    2874              :          * hard way
    2875              :          */
    2876          551 :         ScanKeyInit(&scanKey,
    2877              :                     Anum_pg_database_datname,
    2878              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2879              :                     CStringGetDatum(name));
    2880              : 
    2881          551 :         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
    2882              :                                   NULL, 1, &scanKey);
    2883              : 
    2884          551 :         tuple = systable_getnext(scan);
    2885              : 
    2886          551 :         if (!HeapTupleIsValid(tuple))
    2887              :         {
    2888              :             /* definitely no database of that name */
    2889           21 :             systable_endscan(scan);
    2890           21 :             break;
    2891              :         }
    2892              : 
    2893          530 :         dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
    2894              : 
    2895          530 :         systable_endscan(scan);
    2896              : 
    2897              :         /*
    2898              :          * Now that we have a database OID, we can try to lock the DB.
    2899              :          */
    2900          530 :         if (lockmode != NoLock)
    2901          530 :             LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2902              : 
    2903              :         /*
    2904              :          * And now, re-fetch the tuple by OID.  If it's still there and still
    2905              :          * the same name, we win; else, drop the lock and loop back to try
    2906              :          * again.
    2907              :          */
    2908          530 :         tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
    2909          530 :         if (HeapTupleIsValid(tuple))
    2910              :         {
    2911          530 :             Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
    2912              : 
    2913          530 :             if (strcmp(name, NameStr(dbform->datname)) == 0)
    2914              :             {
    2915              :                 Datum       datum;
    2916              :                 bool        isnull;
    2917              : 
    2918              :                 /* oid of the database */
    2919          530 :                 if (dbIdP)
    2920          530 :                     *dbIdP = dbOid;
    2921              :                 /* oid of the owner */
    2922          530 :                 if (ownerIdP)
    2923          446 :                     *ownerIdP = dbform->datdba;
    2924              :                 /* character encoding */
    2925          530 :                 if (encodingP)
    2926          446 :                     *encodingP = dbform->encoding;
    2927              :                 /* allowed as template? */
    2928          530 :                 if (dbIsTemplateP)
    2929          507 :                     *dbIsTemplateP = dbform->datistemplate;
    2930              :                 /* Has on login event trigger? */
    2931          530 :                 if (dbHasLoginEvtP)
    2932          446 :                     *dbHasLoginEvtP = dbform->dathasloginevt;
    2933              :                 /* allowing connections? */
    2934          530 :                 if (dbAllowConnP)
    2935          446 :                     *dbAllowConnP = dbform->datallowconn;
    2936              :                 /* limit of frozen XIDs */
    2937          530 :                 if (dbFrozenXidP)
    2938          446 :                     *dbFrozenXidP = dbform->datfrozenxid;
    2939              :                 /* minimum MultiXactId */
    2940          530 :                 if (dbMinMultiP)
    2941          446 :                     *dbMinMultiP = dbform->datminmxid;
    2942              :                 /* default tablespace for this database */
    2943          530 :                 if (dbTablespace)
    2944          460 :                     *dbTablespace = dbform->dattablespace;
    2945              :                 /* default locale settings for this database */
    2946          530 :                 if (dbLocProvider)
    2947          446 :                     *dbLocProvider = dbform->datlocprovider;
    2948          530 :                 if (dbCollate)
    2949              :                 {
    2950          446 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
    2951          446 :                     *dbCollate = TextDatumGetCString(datum);
    2952              :                 }
    2953          530 :                 if (dbCtype)
    2954              :                 {
    2955          446 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
    2956          446 :                     *dbCtype = TextDatumGetCString(datum);
    2957              :                 }
    2958          530 :                 if (dbLocale)
    2959              :                 {
    2960          446 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
    2961          446 :                     if (isnull)
    2962          415 :                         *dbLocale = NULL;
    2963              :                     else
    2964           31 :                         *dbLocale = TextDatumGetCString(datum);
    2965              :                 }
    2966          530 :                 if (dbIcurules)
    2967              :                 {
    2968          446 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
    2969          446 :                     if (isnull)
    2970          446 :                         *dbIcurules = NULL;
    2971              :                     else
    2972            0 :                         *dbIcurules = TextDatumGetCString(datum);
    2973              :                 }
    2974          530 :                 if (dbCollversion)
    2975              :                 {
    2976          446 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
    2977          446 :                     if (isnull)
    2978          262 :                         *dbCollversion = NULL;
    2979              :                     else
    2980          184 :                         *dbCollversion = TextDatumGetCString(datum);
    2981              :                 }
    2982          530 :                 ReleaseSysCache(tuple);
    2983          530 :                 result = true;
    2984          530 :                 break;
    2985              :             }
    2986              :             /* can only get here if it was just renamed */
    2987            0 :             ReleaseSysCache(tuple);
    2988              :         }
    2989              : 
    2990            0 :         if (lockmode != NoLock)
    2991            0 :             UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2992              :     }
    2993              : 
    2994          551 :     table_close(relation, AccessShareLock);
    2995              : 
    2996          551 :     return result;
    2997              : }
    2998              : 
    2999              : /* Check if current user has createdb privileges */
    3000              : bool
    3001          496 : have_createdb_privilege(void)
    3002              : {
    3003          496 :     bool        result = false;
    3004              :     HeapTuple   utup;
    3005              : 
    3006              :     /* Superusers can always do everything */
    3007          496 :     if (superuser())
    3008          472 :         return true;
    3009              : 
    3010           24 :     utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
    3011           24 :     if (HeapTupleIsValid(utup))
    3012              :     {
    3013           24 :         result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
    3014           24 :         ReleaseSysCache(utup);
    3015              :     }
    3016           24 :     return result;
    3017              : }
    3018              : 
    3019              : /*
    3020              :  * Remove tablespace directories
    3021              :  *
    3022              :  * We don't know what tablespaces db_id is using, so iterate through all
    3023              :  * tablespaces removing <tablespace>/db_id
    3024              :  */
    3025              : static void
    3026           62 : remove_dbtablespaces(Oid db_id)
    3027              : {
    3028              :     Relation    rel;
    3029              :     TableScanDesc scan;
    3030              :     HeapTuple   tuple;
    3031           62 :     List       *ltblspc = NIL;
    3032              :     ListCell   *cell;
    3033              :     int         ntblspc;
    3034              :     int         i;
    3035              :     Oid        *tablespace_ids;
    3036              : 
    3037           62 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3038           61 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3039          226 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3040              :     {
    3041          165 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3042          165 :         Oid         dsttablespace = spcform->oid;
    3043              :         char       *dstpath;
    3044              :         struct stat st;
    3045              : 
    3046              :         /* Don't mess with the global tablespace */
    3047          165 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3048          104 :             continue;
    3049              : 
    3050          104 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3051              : 
    3052          104 :         if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
    3053              :         {
    3054              :             /* Assume we can ignore it */
    3055           43 :             pfree(dstpath);
    3056           43 :             continue;
    3057              :         }
    3058              : 
    3059           61 :         if (!rmtree(dstpath, true))
    3060            0 :             ereport(WARNING,
    3061              :                     (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3062              :                             dstpath)));
    3063              : 
    3064           61 :         ltblspc = lappend_oid(ltblspc, dsttablespace);
    3065           61 :         pfree(dstpath);
    3066              :     }
    3067              : 
    3068           61 :     ntblspc = list_length(ltblspc);
    3069           61 :     if (ntblspc == 0)
    3070              :     {
    3071            0 :         table_endscan(scan);
    3072            0 :         table_close(rel, AccessShareLock);
    3073            0 :         return;
    3074              :     }
    3075              : 
    3076           61 :     tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
    3077           61 :     i = 0;
    3078          122 :     foreach(cell, ltblspc)
    3079           61 :         tablespace_ids[i++] = lfirst_oid(cell);
    3080              : 
    3081              :     /* Record the filesystem change in XLOG */
    3082              :     {
    3083              :         xl_dbase_drop_rec xlrec;
    3084              : 
    3085           61 :         xlrec.db_id = db_id;
    3086           61 :         xlrec.ntablespaces = ntblspc;
    3087              : 
    3088           61 :         XLogBeginInsert();
    3089           61 :         XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
    3090           61 :         XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
    3091              : 
    3092           61 :         (void) XLogInsert(RM_DBASE_ID,
    3093              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    3094              :     }
    3095              : 
    3096           61 :     list_free(ltblspc);
    3097           61 :     pfree(tablespace_ids);
    3098              : 
    3099           61 :     table_endscan(scan);
    3100           61 :     table_close(rel, AccessShareLock);
    3101              : }
    3102              : 
    3103              : /*
    3104              :  * Check for existing files that conflict with a proposed new DB OID;
    3105              :  * return true if there are any
    3106              :  *
    3107              :  * If there were a subdirectory in any tablespace matching the proposed new
    3108              :  * OID, we'd get a create failure due to the duplicate name ... and then we'd
    3109              :  * try to remove that already-existing subdirectory during the cleanup in
    3110              :  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
    3111              :  * instead we make this extra check before settling on the OID of the new
    3112              :  * database.  This exactly parallels what GetNewRelFileNumber() does for table
    3113              :  * relfilenumber values.
    3114              :  */
    3115              : static bool
    3116          432 : check_db_file_conflict(Oid db_id)
    3117              : {
    3118          432 :     bool        result = false;
    3119              :     Relation    rel;
    3120              :     TableScanDesc scan;
    3121              :     HeapTuple   tuple;
    3122              : 
    3123          432 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3124          432 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3125         1381 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3126              :     {
    3127          949 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3128          949 :         Oid         dsttablespace = spcform->oid;
    3129              :         char       *dstpath;
    3130              :         struct stat st;
    3131              : 
    3132              :         /* Don't mess with the global tablespace */
    3133          949 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3134          432 :             continue;
    3135              : 
    3136          517 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3137              : 
    3138          517 :         if (lstat(dstpath, &st) == 0)
    3139              :         {
    3140              :             /* Found a conflicting file (or directory, whatever) */
    3141            0 :             pfree(dstpath);
    3142            0 :             result = true;
    3143            0 :             break;
    3144              :         }
    3145              : 
    3146          517 :         pfree(dstpath);
    3147              :     }
    3148              : 
    3149          432 :     table_endscan(scan);
    3150          432 :     table_close(rel, AccessShareLock);
    3151              : 
    3152          432 :     return result;
    3153              : }
    3154              : 
    3155              : /*
    3156              :  * Issue a suitable errdetail message for a busy database
    3157              :  */
    3158              : static int
    3159            1 : errdetail_busy_db(int notherbackends, int npreparedxacts)
    3160              : {
    3161            1 :     if (notherbackends > 0 && npreparedxacts > 0)
    3162              : 
    3163              :         /*
    3164              :          * We don't deal with singular versus plural here, since gettext
    3165              :          * doesn't support multiple plurals in one string.
    3166              :          */
    3167            0 :         errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
    3168              :                   notherbackends, npreparedxacts);
    3169            1 :     else if (notherbackends > 0)
    3170            1 :         errdetail_plural("There is %d other session using the database.",
    3171              :                          "There are %d other sessions using the database.",
    3172              :                          notherbackends,
    3173              :                          notherbackends);
    3174              :     else
    3175            0 :         errdetail_plural("There is %d prepared transaction using the database.",
    3176              :                          "There are %d prepared transactions using the database.",
    3177              :                          npreparedxacts,
    3178              :                          npreparedxacts);
    3179            1 :     return 0;                   /* just to keep ereport macro happy */
    3180              : }
    3181              : 
    3182              : /*
    3183              :  * get_database_oid - given a database name, look up the OID
    3184              :  *
    3185              :  * If missing_ok is false, throw an error if database name not found.  If
    3186              :  * true, just return InvalidOid.
    3187              :  */
    3188              : Oid
    3189         1754 : get_database_oid(const char *dbname, bool missing_ok)
    3190              : {
    3191              :     Relation    pg_database;
    3192              :     ScanKeyData entry[1];
    3193              :     SysScanDesc scan;
    3194              :     HeapTuple   dbtuple;
    3195              :     Oid         oid;
    3196              : 
    3197              :     /*
    3198              :      * There's no syscache for pg_database indexed by name, so we must look
    3199              :      * the hard way.
    3200              :      */
    3201         1754 :     pg_database = table_open(DatabaseRelationId, AccessShareLock);
    3202         1754 :     ScanKeyInit(&entry[0],
    3203              :                 Anum_pg_database_datname,
    3204              :                 BTEqualStrategyNumber, F_NAMEEQ,
    3205              :                 CStringGetDatum(dbname));
    3206         1754 :     scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
    3207              :                               NULL, 1, entry);
    3208              : 
    3209         1754 :     dbtuple = systable_getnext(scan);
    3210              : 
    3211              :     /* We assume that there can be at most one matching tuple */
    3212         1754 :     if (HeapTupleIsValid(dbtuple))
    3213         1284 :         oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
    3214              :     else
    3215          470 :         oid = InvalidOid;
    3216              : 
    3217         1754 :     systable_endscan(scan);
    3218         1754 :     table_close(pg_database, AccessShareLock);
    3219              : 
    3220         1754 :     if (!OidIsValid(oid) && !missing_ok)
    3221            4 :         ereport(ERROR,
    3222              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    3223              :                  errmsg("database \"%s\" does not exist",
    3224              :                         dbname)));
    3225              : 
    3226         1750 :     return oid;
    3227              : }
    3228              : 
    3229              : 
    3230              : /*
    3231              :  * While dropping a database the pg_database row is marked invalid, but the
    3232              :  * catalog contents still exist. Connections to such a database are not
    3233              :  * allowed.
    3234              :  */
    3235              : bool
    3236        28416 : database_is_invalid_form(Form_pg_database datform)
    3237              : {
    3238        28416 :     return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
    3239              : }
    3240              : 
    3241              : 
    3242              : /*
    3243              :  * Convenience wrapper around database_is_invalid_form()
    3244              :  */
    3245              : bool
    3246          446 : database_is_invalid_oid(Oid dboid)
    3247              : {
    3248              :     HeapTuple   dbtup;
    3249              :     Form_pg_database dbform;
    3250              :     bool        invalid;
    3251              : 
    3252          446 :     dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
    3253          446 :     if (!HeapTupleIsValid(dbtup))
    3254            0 :         elog(ERROR, "cache lookup failed for database %u", dboid);
    3255          446 :     dbform = (Form_pg_database) GETSTRUCT(dbtup);
    3256              : 
    3257          446 :     invalid = database_is_invalid_form(dbform);
    3258              : 
    3259          446 :     ReleaseSysCache(dbtup);
    3260              : 
    3261          446 :     return invalid;
    3262              : }
    3263              : 
    3264              : 
    3265              : /*
    3266              :  * recovery_create_dbdir()
    3267              :  *
    3268              :  * During recovery, there's a case where we validly need to recover a missing
    3269              :  * tablespace directory so that recovery can continue.  This happens when
    3270              :  * recovery wants to create a database but the holding tablespace has been
    3271              :  * removed before the server stopped.  Since we expect that the directory will
    3272              :  * be gone before reaching recovery consistency, and we have no knowledge about
    3273              :  * the tablespace other than its OID here, we create a real directory under
    3274              :  * pg_tblspc here instead of restoring the symlink.
    3275              :  *
    3276              :  * If only_tblspc is true, then the requested directory must be in pg_tblspc/
    3277              :  */
    3278              : static void
    3279           25 : recovery_create_dbdir(char *path, bool only_tblspc)
    3280              : {
    3281              :     struct stat st;
    3282              : 
    3283              :     Assert(RecoveryInProgress());
    3284              : 
    3285           25 :     if (stat(path, &st) == 0)
    3286           25 :         return;
    3287              : 
    3288            0 :     if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
    3289            0 :         elog(PANIC, "requested to created invalid directory: %s", path);
    3290              : 
    3291            0 :     if (reachedConsistency && !allow_in_place_tablespaces)
    3292            0 :         ereport(PANIC,
    3293              :                 errmsg("missing directory \"%s\"", path));
    3294              : 
    3295            0 :     elog(reachedConsistency ? WARNING : DEBUG1,
    3296              :          "creating missing directory: %s", path);
    3297              : 
    3298            0 :     if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
    3299            0 :         ereport(PANIC,
    3300              :                 errmsg("could not create missing directory \"%s\": %m", path));
    3301              : }
    3302              : 
    3303              : 
    3304              : /*
    3305              :  * DATABASE resource manager's routines
    3306              :  */
    3307              : void
    3308           46 : dbase_redo(XLogReaderState *record)
    3309              : {
    3310           46 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    3311              : 
    3312              :     /* Backup blocks are not used in dbase records */
    3313              :     Assert(!XLogRecHasAnyBlockRefs(record));
    3314              : 
    3315           46 :     if (info == XLOG_DBASE_CREATE_FILE_COPY)
    3316              :     {
    3317            5 :         xl_dbase_create_file_copy_rec *xlrec =
    3318            5 :             (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
    3319              :         char       *src_path;
    3320              :         char       *dst_path;
    3321              :         char       *parent_path;
    3322              :         struct stat st;
    3323              : 
    3324            5 :         src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
    3325            5 :         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3326              : 
    3327              :         /*
    3328              :          * Our theory for replaying a CREATE is to forcibly drop the target
    3329              :          * subdirectory if present, then re-copy the source data. This may be
    3330              :          * more work than needed, but it is simple to implement.
    3331              :          */
    3332            5 :         if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
    3333              :         {
    3334            0 :             if (!rmtree(dst_path, true))
    3335              :                 /* If this failed, copydir() below is going to error. */
    3336            0 :                 ereport(WARNING,
    3337              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3338              :                                 dst_path)));
    3339              :         }
    3340              : 
    3341              :         /*
    3342              :          * If the parent of the target path doesn't exist, create it now. This
    3343              :          * enables us to create the target underneath later.
    3344              :          */
    3345            5 :         parent_path = pstrdup(dst_path);
    3346            5 :         get_parent_directory(parent_path);
    3347            5 :         if (stat(parent_path, &st) < 0)
    3348              :         {
    3349            0 :             if (errno != ENOENT)
    3350            0 :                 ereport(FATAL,
    3351              :                         errmsg("could not stat directory \"%s\": %m",
    3352              :                                dst_path));
    3353              : 
    3354              :             /* create the parent directory if needed and valid */
    3355            0 :             recovery_create_dbdir(parent_path, true);
    3356              :         }
    3357            5 :         pfree(parent_path);
    3358              : 
    3359              :         /*
    3360              :          * There's a case where the copy source directory is missing for the
    3361              :          * same reason above.  Create the empty source directory so that
    3362              :          * copydir below doesn't fail.  The directory will be dropped soon by
    3363              :          * recovery.
    3364              :          */
    3365            5 :         if (stat(src_path, &st) < 0 && errno == ENOENT)
    3366            0 :             recovery_create_dbdir(src_path, false);
    3367              : 
    3368              :         /*
    3369              :          * Force dirty buffers out to disk, to ensure source database is
    3370              :          * up-to-date for the copy.
    3371              :          */
    3372            5 :         FlushDatabaseBuffers(xlrec->src_db_id);
    3373              : 
    3374              :         /* Close all smgr fds in all backends. */
    3375            5 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3376              : 
    3377              :         /*
    3378              :          * Copy this subdirectory to the new location
    3379              :          *
    3380              :          * We don't need to copy subdirectories
    3381              :          */
    3382            5 :         copydir(src_path, dst_path, false);
    3383              : 
    3384            5 :         pfree(src_path);
    3385            5 :         pfree(dst_path);
    3386              :     }
    3387           41 :     else if (info == XLOG_DBASE_CREATE_WAL_LOG)
    3388              :     {
    3389           25 :         xl_dbase_create_wal_log_rec *xlrec =
    3390           25 :             (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
    3391              :         char       *dbpath;
    3392              :         char       *parent_path;
    3393              : 
    3394           25 :         dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3395              : 
    3396              :         /* create the parent directory if needed and valid */
    3397           25 :         parent_path = pstrdup(dbpath);
    3398           25 :         get_parent_directory(parent_path);
    3399           25 :         recovery_create_dbdir(parent_path, true);
    3400           25 :         pfree(parent_path);
    3401              : 
    3402              :         /* Create the database directory with the version file. */
    3403           25 :         CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
    3404              :                                 true);
    3405           25 :         pfree(dbpath);
    3406              :     }
    3407           16 :     else if (info == XLOG_DBASE_DROP)
    3408              :     {
    3409           16 :         xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
    3410              :         char       *dst_path;
    3411              :         int         i;
    3412              : 
    3413           16 :         if (InHotStandby)
    3414              :         {
    3415              :             /*
    3416              :              * Lock database while we resolve conflicts to ensure that
    3417              :              * InitPostgres() cannot fully re-execute concurrently. This
    3418              :              * avoids backends re-connecting automatically to same database,
    3419              :              * which can happen in some cases.
    3420              :              *
    3421              :              * This will lock out walsenders trying to connect to db-specific
    3422              :              * slots for logical decoding too, so it's safe for us to drop
    3423              :              * slots.
    3424              :              */
    3425           16 :             LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3426           16 :             ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    3427              :         }
    3428              : 
    3429              :         /* Drop any database-specific replication slots */
    3430           16 :         ReplicationSlotsDropDBSlots(xlrec->db_id);
    3431              : 
    3432              :         /* Drop pages for this database that are in the shared buffer cache */
    3433           16 :         DropDatabaseBuffers(xlrec->db_id);
    3434              : 
    3435              :         /* Also, clean out any fsync requests that might be pending in md.c */
    3436           16 :         ForgetDatabaseSyncRequests(xlrec->db_id);
    3437              : 
    3438              :         /* Clean out the xlog relcache too */
    3439           16 :         XLogDropDatabase(xlrec->db_id);
    3440              : 
    3441              :         /* Close all smgr fds in all backends. */
    3442           16 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3443              : 
    3444           32 :         for (i = 0; i < xlrec->ntablespaces; i++)
    3445              :         {
    3446           16 :             dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
    3447              : 
    3448              :             /* And remove the physical files */
    3449           16 :             if (!rmtree(dst_path, true))
    3450            0 :                 ereport(WARNING,
    3451              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3452              :                                 dst_path)));
    3453           16 :             pfree(dst_path);
    3454              :         }
    3455              : 
    3456           16 :         if (InHotStandby)
    3457              :         {
    3458              :             /*
    3459              :              * Release locks prior to commit. XXX There is a race condition
    3460              :              * here that may allow backends to reconnect, but the window for
    3461              :              * this is small because the gap between here and commit is mostly
    3462              :              * fairly small and it is unlikely that people will be dropping
    3463              :              * databases that we are trying to connect to anyway.
    3464              :              */
    3465           16 :             UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3466              :         }
    3467              :     }
    3468              :     else
    3469            0 :         elog(PANIC, "dbase_redo: unknown op code %u", info);
    3470           46 : }
        

Generated by: LCOV version 2.0-1