LCOV - code coverage report
Current view: top level - src/backend/commands - dbcommands.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.5 % 1107 935
Test Date: 2026-02-17 17:20:33 Functions: 96.6 % 29 28
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * dbcommands.c
       4              :  *      Database management commands (create/drop database).
       5              :  *
       6              :  * Note: database creation/destruction commands use exclusive locks on
       7              :  * the database objects (as expressed by LockSharedObject()) to avoid
       8              :  * stepping on each others' toes.  Formerly we used table-level locks
       9              :  * on pg_database, but that's too coarse-grained.
      10              :  *
      11              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      12              :  * Portions Copyright (c) 1994, Regents of the University of California
      13              :  *
      14              :  *
      15              :  * IDENTIFICATION
      16              :  *    src/backend/commands/dbcommands.c
      17              :  *
      18              :  *-------------------------------------------------------------------------
      19              :  */
      20              : #include "postgres.h"
      21              : 
      22              : #include <fcntl.h>
      23              : #include <unistd.h>
      24              : #include <sys/stat.h>
      25              : 
      26              : #include "access/genam.h"
      27              : #include "access/heapam.h"
      28              : #include "access/htup_details.h"
      29              : #include "access/multixact.h"
      30              : #include "access/tableam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xloginsert.h"
      33              : #include "access/xlogrecovery.h"
      34              : #include "access/xlogutils.h"
      35              : #include "catalog/catalog.h"
      36              : #include "catalog/dependency.h"
      37              : #include "catalog/indexing.h"
      38              : #include "catalog/objectaccess.h"
      39              : #include "catalog/pg_authid.h"
      40              : #include "catalog/pg_collation.h"
      41              : #include "catalog/pg_database.h"
      42              : #include "catalog/pg_db_role_setting.h"
      43              : #include "catalog/pg_subscription.h"
      44              : #include "catalog/pg_tablespace.h"
      45              : #include "commands/comment.h"
      46              : #include "commands/dbcommands.h"
      47              : #include "commands/dbcommands_xlog.h"
      48              : #include "commands/defrem.h"
      49              : #include "commands/seclabel.h"
      50              : #include "commands/tablespace.h"
      51              : #include "common/file_perm.h"
      52              : #include "mb/pg_wchar.h"
      53              : #include "miscadmin.h"
      54              : #include "pgstat.h"
      55              : #include "postmaster/bgwriter.h"
      56              : #include "replication/slot.h"
      57              : #include "storage/copydir.h"
      58              : #include "storage/fd.h"
      59              : #include "storage/ipc.h"
      60              : #include "storage/lmgr.h"
      61              : #include "storage/md.h"
      62              : #include "storage/procarray.h"
      63              : #include "storage/procsignal.h"
      64              : #include "storage/smgr.h"
      65              : #include "utils/acl.h"
      66              : #include "utils/builtins.h"
      67              : #include "utils/fmgroids.h"
      68              : #include "utils/lsyscache.h"
      69              : #include "utils/pg_locale.h"
      70              : #include "utils/relmapper.h"
      71              : #include "utils/snapmgr.h"
      72              : #include "utils/syscache.h"
      73              : 
      74              : /*
      75              :  * Create database strategy.
      76              :  *
      77              :  * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
      78              :  * copied block.
      79              :  *
      80              :  * CREATEDB_FILE_COPY will simply perform a file system level copy of the
      81              :  * database and log a single record for each tablespace copied. To make this
      82              :  * safe, it also triggers checkpoints before and after the operation.
      83              :  */
      84              : typedef enum CreateDBStrategy
      85              : {
      86              :     CREATEDB_WAL_LOG,
      87              :     CREATEDB_FILE_COPY,
      88              : } CreateDBStrategy;
      89              : 
      90              : typedef struct
      91              : {
      92              :     Oid         src_dboid;      /* source (template) DB */
      93              :     Oid         dest_dboid;     /* DB we are trying to create */
      94              :     CreateDBStrategy strategy;  /* create db strategy */
      95              : } createdb_failure_params;
      96              : 
      97              : typedef struct
      98              : {
      99              :     Oid         dest_dboid;     /* DB we are trying to move */
     100              :     Oid         dest_tsoid;     /* tablespace we are trying to move to */
     101              : } movedb_failure_params;
     102              : 
     103              : /*
     104              :  * Information about a relation to be copied when creating a database.
     105              :  */
     106              : typedef struct CreateDBRelInfo
     107              : {
     108              :     RelFileLocator rlocator;    /* physical relation identifier */
     109              :     Oid         reloid;         /* relation oid */
     110              :     bool        permanent;      /* relation is permanent or unlogged */
     111              : } CreateDBRelInfo;
     112              : 
     113              : 
     114              : /* non-export function prototypes */
     115              : static void createdb_failure_callback(int code, Datum arg);
     116              : static void movedb(const char *dbname, const char *tblspcname);
     117              : static void movedb_failure_callback(int code, Datum arg);
     118              : static bool get_db_info(const char *name, LOCKMODE lockmode,
     119              :                         Oid *dbIdP, Oid *ownerIdP,
     120              :                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
     121              :                         TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
     122              :                         Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
     123              :                         char **dbIcurules,
     124              :                         char *dbLocProvider,
     125              :                         char **dbCollversion);
     126              : static void remove_dbtablespaces(Oid db_id);
     127              : static bool check_db_file_conflict(Oid db_id);
     128              : static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
     129              : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     130              :                                       Oid dst_tsid);
     131              : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
     132              : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
     133              :                                            Oid dbid, char *srcpath,
     134              :                                            List *rlocatorlist, Snapshot snapshot);
     135              : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
     136              :                                                        Oid tbid, Oid dbid,
     137              :                                                        char *srcpath);
     138              : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
     139              :                                     bool isRedo);
     140              : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
     141              :                                         Oid src_tsid, Oid dst_tsid);
     142              : static void recovery_create_dbdir(char *path, bool only_tblspc);
     143              : 
     144              : /*
     145              :  * Create a new database using the WAL_LOG strategy.
     146              :  *
     147              :  * Each copied block is separately written to the write-ahead log.
     148              :  */
     149              : static void
     150          253 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
     151              :                           Oid src_tsid, Oid dst_tsid)
     152              : {
     153              :     char       *srcpath;
     154              :     char       *dstpath;
     155          253 :     List       *rlocatorlist = NULL;
     156              :     ListCell   *cell;
     157              :     LockRelId   srcrelid;
     158              :     LockRelId   dstrelid;
     159              :     RelFileLocator srcrlocator;
     160              :     RelFileLocator dstrlocator;
     161              :     CreateDBRelInfo *relinfo;
     162              : 
     163              :     /* Get source and destination database paths. */
     164          253 :     srcpath = GetDatabasePath(src_dboid, src_tsid);
     165          253 :     dstpath = GetDatabasePath(dst_dboid, dst_tsid);
     166              : 
     167              :     /* Create database directory and write PG_VERSION file. */
     168          253 :     CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
     169              : 
     170              :     /* Copy relmap file from source database to the destination database. */
     171          253 :     RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
     172              : 
     173              :     /* Get list of relfilelocators to copy from the source database. */
     174          253 :     rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
     175              :     Assert(rlocatorlist != NIL);
     176              : 
     177              :     /*
     178              :      * Database IDs will be the same for all relations so set them before
     179              :      * entering the loop.
     180              :      */
     181          253 :     srcrelid.dbId = src_dboid;
     182          253 :     dstrelid.dbId = dst_dboid;
     183              : 
     184              :     /* Loop over our list of relfilelocators and copy each one. */
     185        56503 :     foreach(cell, rlocatorlist)
     186              :     {
     187        56252 :         relinfo = lfirst(cell);
     188        56252 :         srcrlocator = relinfo->rlocator;
     189              : 
     190              :         /*
     191              :          * If the relation is from the source db's default tablespace then we
     192              :          * need to create it in the destination db's default tablespace.
     193              :          * Otherwise, we need to create in the same tablespace as it is in the
     194              :          * source database.
     195              :          */
     196        56252 :         if (srcrlocator.spcOid == src_tsid)
     197        56252 :             dstrlocator.spcOid = dst_tsid;
     198              :         else
     199            0 :             dstrlocator.spcOid = srcrlocator.spcOid;
     200              : 
     201        56252 :         dstrlocator.dbOid = dst_dboid;
     202        56252 :         dstrlocator.relNumber = srcrlocator.relNumber;
     203              : 
     204              :         /*
     205              :          * Acquire locks on source and target relations before copying.
     206              :          *
     207              :          * We typically do not read relation data into shared_buffers without
     208              :          * holding a relation lock. It's unclear what could go wrong if we
     209              :          * skipped it in this case, because nobody can be modifying either the
     210              :          * source or destination database at this point, and we have locks on
     211              :          * both databases, too, but let's take the conservative route.
     212              :          */
     213        56252 :         dstrelid.relId = srcrelid.relId = relinfo->reloid;
     214        56252 :         LockRelationId(&srcrelid, AccessShareLock);
     215        56252 :         LockRelationId(&dstrelid, AccessShareLock);
     216              : 
     217              :         /* Copy relation storage from source to the destination. */
     218        56252 :         CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
     219              : 
     220              :         /* Release the relation locks. */
     221        56250 :         UnlockRelationId(&srcrelid, AccessShareLock);
     222        56250 :         UnlockRelationId(&dstrelid, AccessShareLock);
     223              :     }
     224              : 
     225          251 :     pfree(srcpath);
     226          251 :     pfree(dstpath);
     227          251 :     list_free_deep(rlocatorlist);
     228          251 : }
     229              : 
     230              : /*
     231              :  * Scan the pg_class table in the source database to identify the relations
     232              :  * that need to be copied to the destination database.
     233              :  *
     234              :  * This is an exception to the usual rule that cross-database access is
     235              :  * not possible. We can make it work here because we know that there are no
     236              :  * connections to the source database and (since there can't be prepared
     237              :  * transactions touching that database) no in-doubt tuples either. This
     238              :  * means that we don't need to worry about pruning removing anything from
     239              :  * under us, and we don't need to be too picky about our snapshot either.
     240              :  * As long as it sees all previously-committed XIDs as committed and all
     241              :  * aborted XIDs as aborted, we should be fine: nothing else is possible
     242              :  * here.
     243              :  *
     244              :  * We can't rely on the relcache for anything here, because that only knows
     245              :  * about the database to which we are connected, and can't handle access to
     246              :  * other databases. That also means we can't rely on the heap scan
     247              :  * infrastructure, which would be a bad idea anyway since it might try
     248              :  * to do things like HOT pruning which we definitely can't do safely in
     249              :  * a database to which we're not even connected.
     250              :  */
     251              : static List *
     252          253 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
     253              : {
     254              :     RelFileLocator rlocator;
     255              :     BlockNumber nblocks;
     256              :     BlockNumber blkno;
     257              :     Buffer      buf;
     258              :     RelFileNumber relfilenumber;
     259              :     Page        page;
     260          253 :     List       *rlocatorlist = NIL;
     261              :     LockRelId   relid;
     262              :     Snapshot    snapshot;
     263              :     SMgrRelation smgr;
     264              :     BufferAccessStrategy bstrategy;
     265              : 
     266              :     /* Get pg_class relfilenumber. */
     267          253 :     relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     268              :                                                           RelationRelationId);
     269              : 
     270              :     /* Don't read data into shared_buffers without holding a relation lock. */
     271          253 :     relid.dbId = dbid;
     272          253 :     relid.relId = RelationRelationId;
     273          253 :     LockRelationId(&relid, AccessShareLock);
     274              : 
     275              :     /* Prepare a RelFileLocator for the pg_class relation. */
     276          253 :     rlocator.spcOid = tbid;
     277          253 :     rlocator.dbOid = dbid;
     278          253 :     rlocator.relNumber = relfilenumber;
     279              : 
     280          253 :     smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
     281          253 :     nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
     282          253 :     smgrclose(smgr);
     283              : 
     284              :     /* Use a buffer access strategy since this is a bulk read operation. */
     285          253 :     bstrategy = GetAccessStrategy(BAS_BULKREAD);
     286              : 
     287              :     /*
     288              :      * As explained in the function header comments, we need a snapshot that
     289              :      * will see all committed transactions as committed, and our transaction
     290              :      * snapshot - or the active snapshot - might not be new enough for that,
     291              :      * but the return value of GetLatestSnapshot() should work fine.
     292              :      */
     293          253 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
     294              : 
     295              :     /* Process the relation block by block. */
     296         3795 :     for (blkno = 0; blkno < nblocks; blkno++)
     297              :     {
     298         3542 :         CHECK_FOR_INTERRUPTS();
     299              : 
     300         3542 :         buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
     301              :                                         RBM_NORMAL, bstrategy, true);
     302              : 
     303         3542 :         LockBuffer(buf, BUFFER_LOCK_SHARE);
     304         3542 :         page = BufferGetPage(buf);
     305         3542 :         if (PageIsNew(page) || PageIsEmpty(page))
     306              :         {
     307            0 :             UnlockReleaseBuffer(buf);
     308            0 :             continue;
     309              :         }
     310              : 
     311              :         /* Append relevant pg_class tuples for current page to rlocatorlist. */
     312         3542 :         rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
     313              :                                                      srcpath, rlocatorlist,
     314              :                                                      snapshot);
     315              : 
     316         3542 :         UnlockReleaseBuffer(buf);
     317              :     }
     318          253 :     UnregisterSnapshot(snapshot);
     319              : 
     320              :     /* Release relation lock. */
     321          253 :     UnlockRelationId(&relid, AccessShareLock);
     322              : 
     323          253 :     return rlocatorlist;
     324              : }
     325              : 
     326              : /*
     327              :  * Scan one page of the source database's pg_class relation and add relevant
     328              :  * entries to rlocatorlist. The return value is the updated list.
     329              :  */
     330              : static List *
     331         3542 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
     332              :                               char *srcpath, List *rlocatorlist,
     333              :                               Snapshot snapshot)
     334              : {
     335         3542 :     BlockNumber blkno = BufferGetBlockNumber(buf);
     336              :     OffsetNumber offnum;
     337              :     OffsetNumber maxoff;
     338              :     HeapTupleData tuple;
     339              : 
     340         3542 :     maxoff = PageGetMaxOffsetNumber(page);
     341              : 
     342              :     /* Loop over offsets. */
     343         3542 :     for (offnum = FirstOffsetNumber;
     344       182503 :          offnum <= maxoff;
     345       178961 :          offnum = OffsetNumberNext(offnum))
     346              :     {
     347              :         ItemId      itemid;
     348              : 
     349       178961 :         itemid = PageGetItemId(page, offnum);
     350              : 
     351              :         /* Nothing to do if slot is empty or already dead. */
     352       178961 :         if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
     353       128065 :             ItemIdIsRedirected(itemid))
     354        73413 :             continue;
     355              : 
     356              :         Assert(ItemIdIsNormal(itemid));
     357       105548 :         ItemPointerSet(&(tuple.t_self), blkno, offnum);
     358              : 
     359              :         /* Initialize a HeapTupleData structure. */
     360       105548 :         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
     361       105548 :         tuple.t_len = ItemIdGetLength(itemid);
     362       105548 :         tuple.t_tableOid = RelationRelationId;
     363              : 
     364              :         /* Skip tuples that are not visible to this snapshot. */
     365       105548 :         if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
     366              :         {
     367              :             CreateDBRelInfo *relinfo;
     368              : 
     369              :             /*
     370              :              * ScanSourceDatabasePgClassTuple is in charge of constructing a
     371              :              * CreateDBRelInfo object for this tuple, but can also decide that
     372              :              * this tuple isn't something we need to copy. If we do need to
     373              :              * copy the relation, add it to the list.
     374              :              */
     375       105529 :             relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
     376              :                                                      srcpath);
     377       105529 :             if (relinfo != NULL)
     378        56700 :                 rlocatorlist = lappend(rlocatorlist, relinfo);
     379              :         }
     380              :     }
     381              : 
     382         3542 :     return rlocatorlist;
     383              : }
     384              : 
     385              : /*
     386              :  * Decide whether a certain pg_class tuple represents something that
     387              :  * needs to be copied from the source database to the destination database,
     388              :  * and if so, construct a CreateDBRelInfo for it.
     389              :  *
     390              :  * Visibility checks are handled by the caller, so our job here is just
     391              :  * to assess the data stored in the tuple.
     392              :  */
     393              : CreateDBRelInfo *
     394       105529 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
     395              :                                char *srcpath)
     396              : {
     397              :     CreateDBRelInfo *relinfo;
     398              :     Form_pg_class classForm;
     399       105529 :     RelFileNumber relfilenumber = InvalidRelFileNumber;
     400              : 
     401       105529 :     classForm = (Form_pg_class) GETSTRUCT(tuple);
     402              : 
     403              :     /*
     404              :      * Return NULL if this object does not need to be copied.
     405              :      *
     406              :      * Shared objects don't need to be copied, because they are shared.
     407              :      * Objects without storage can't be copied, because there's nothing to
     408              :      * copy. Temporary relations don't need to be copied either, because they
     409              :      * are inaccessible outside of the session that created them, which must
     410              :      * be gone already, and couldn't connect to a different database if it
     411              :      * still existed. autovacuum will eventually remove the pg_class entries
     412              :      * as well.
     413              :      */
     414       105529 :     if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
     415        93891 :         !RELKIND_HAS_STORAGE(classForm->relkind) ||
     416        56700 :         classForm->relpersistence == RELPERSISTENCE_TEMP)
     417        48829 :         return NULL;
     418              : 
     419              :     /*
     420              :      * If relfilenumber is valid then directly use it.  Otherwise, consult the
     421              :      * relmap.
     422              :      */
     423        56700 :     if (RelFileNumberIsValid(classForm->relfilenode))
     424        52399 :         relfilenumber = classForm->relfilenode;
     425              :     else
     426         4301 :         relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     427              :                                                               classForm->oid);
     428              : 
     429              :     /* We must have a valid relfilenumber. */
     430        56700 :     if (!RelFileNumberIsValid(relfilenumber))
     431            0 :         elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
     432              :              classForm->oid);
     433              : 
     434              :     /* Prepare a rel info element and add it to the list. */
     435        56700 :     relinfo = palloc_object(CreateDBRelInfo);
     436        56700 :     if (OidIsValid(classForm->reltablespace))
     437            0 :         relinfo->rlocator.spcOid = classForm->reltablespace;
     438              :     else
     439        56700 :         relinfo->rlocator.spcOid = tbid;
     440              : 
     441        56700 :     relinfo->rlocator.dbOid = dbid;
     442        56700 :     relinfo->rlocator.relNumber = relfilenumber;
     443        56700 :     relinfo->reloid = classForm->oid;
     444              : 
     445              :     /* Temporary relations were rejected above. */
     446              :     Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
     447        56700 :     relinfo->permanent =
     448        56700 :         (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
     449              : 
     450        56700 :     return relinfo;
     451              : }
     452              : 
     453              : /*
     454              :  * Create database directory and write out the PG_VERSION file in the database
     455              :  * path.  If isRedo is true, it's okay for the database directory to exist
     456              :  * already.
     457              :  */
     458              : static void
     459          276 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
     460              : {
     461              :     int         fd;
     462              :     int         nbytes;
     463              :     char        versionfile[MAXPGPATH];
     464              :     char        buf[16];
     465              : 
     466              :     /*
     467              :      * Note that we don't have to copy version data from the source database;
     468              :      * there's only one legal value.
     469              :      */
     470          276 :     sprintf(buf, "%s\n", PG_MAJORVERSION);
     471          276 :     nbytes = strlen(PG_MAJORVERSION) + 1;
     472              : 
     473              :     /* Create database directory. */
     474          276 :     if (MakePGDirectory(dbpath) < 0)
     475              :     {
     476              :         /* Failure other than already exists or not in WAL replay? */
     477            8 :         if (errno != EEXIST || !isRedo)
     478            0 :             ereport(ERROR,
     479              :                     (errcode_for_file_access(),
     480              :                      errmsg("could not create directory \"%s\": %m", dbpath)));
     481              :     }
     482              : 
     483              :     /*
     484              :      * Create PG_VERSION file in the database path.  If the file already
     485              :      * exists and we are in WAL replay then try again to open it in write
     486              :      * mode.
     487              :      */
     488          276 :     snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
     489              : 
     490          276 :     fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
     491          276 :     if (fd < 0 && errno == EEXIST && isRedo)
     492            8 :         fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
     493              : 
     494          276 :     if (fd < 0)
     495            0 :         ereport(ERROR,
     496              :                 (errcode_for_file_access(),
     497              :                  errmsg("could not create file \"%s\": %m", versionfile)));
     498              : 
     499              :     /* Write PG_MAJORVERSION in the PG_VERSION file. */
     500          276 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
     501          276 :     errno = 0;
     502          276 :     if ((int) write(fd, buf, nbytes) != nbytes)
     503              :     {
     504              :         /* If write didn't set errno, assume problem is no disk space. */
     505            0 :         if (errno == 0)
     506            0 :             errno = ENOSPC;
     507            0 :         ereport(ERROR,
     508              :                 (errcode_for_file_access(),
     509              :                  errmsg("could not write to file \"%s\": %m", versionfile)));
     510              :     }
     511          276 :     pgstat_report_wait_end();
     512              : 
     513          276 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
     514          276 :     if (pg_fsync(fd) != 0)
     515            0 :         ereport(data_sync_elevel(ERROR),
     516              :                 (errcode_for_file_access(),
     517              :                  errmsg("could not fsync file \"%s\": %m", versionfile)));
     518          276 :     fsync_fname(dbpath, true);
     519          276 :     pgstat_report_wait_end();
     520              : 
     521              :     /* Close the version file. */
     522          276 :     CloseTransientFile(fd);
     523              : 
     524              :     /* If we are not in WAL replay then write the WAL. */
     525          276 :     if (!isRedo)
     526              :     {
     527              :         xl_dbase_create_wal_log_rec xlrec;
     528              : 
     529          253 :         START_CRIT_SECTION();
     530              : 
     531          253 :         xlrec.db_id = dbid;
     532          253 :         xlrec.tablespace_id = tsid;
     533              : 
     534          253 :         XLogBeginInsert();
     535          253 :         XLogRegisterData(&xlrec,
     536              :                          sizeof(xl_dbase_create_wal_log_rec));
     537              : 
     538          253 :         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
     539              : 
     540          253 :         END_CRIT_SECTION();
     541              :     }
     542          276 : }
     543              : 
     544              : /*
     545              :  * Create a new database using the FILE_COPY strategy.
     546              :  *
     547              :  * Copy each tablespace at the filesystem level, and log a single WAL record
     548              :  * for each tablespace copied.  This requires a checkpoint before and after the
     549              :  * copy, which may be expensive, but it does greatly reduce WAL generation
     550              :  * if the copied database is large.
     551              :  */
     552              : static void
     553          138 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     554              :                             Oid dst_tsid)
     555              : {
     556              :     TableScanDesc scan;
     557              :     Relation    rel;
     558              :     HeapTuple   tuple;
     559              : 
     560              :     /*
     561              :      * Force a checkpoint before starting the copy. This will force all dirty
     562              :      * buffers, including those of unlogged tables, out to disk, to ensure
     563              :      * source database is up-to-date on disk for the copy.
     564              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
     565              :      * process any pending unlink requests. Otherwise, if a checkpoint
     566              :      * happened while we're copying files, a file might be deleted just when
     567              :      * we're about to copy it, causing the lstat() call in copydir() to fail
     568              :      * with ENOENT.
     569              :      *
     570              :      * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
     571              :      * is careful to ensure that template0 is fully written to disk prior to
     572              :      * any CREATE DATABASE commands.
     573              :      */
     574          138 :     if (!IsBinaryUpgrade)
     575          108 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     576              :                           CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
     577              : 
     578              :     /*
     579              :      * Iterate through all tablespaces of the template database, and copy each
     580              :      * one to the new database.
     581              :      */
     582          138 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
     583          138 :     scan = table_beginscan_catalog(rel, 0, NULL);
     584          448 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     585              :     {
     586          310 :         Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
     587          310 :         Oid         srctablespace = spaceform->oid;
     588              :         Oid         dsttablespace;
     589              :         char       *srcpath;
     590              :         char       *dstpath;
     591              :         struct stat st;
     592              : 
     593              :         /* No need to copy global tablespace */
     594          310 :         if (srctablespace == GLOBALTABLESPACE_OID)
     595          172 :             continue;
     596              : 
     597          172 :         srcpath = GetDatabasePath(src_dboid, srctablespace);
     598              : 
     599          310 :         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
     600          138 :             directory_is_empty(srcpath))
     601              :         {
     602              :             /* Assume we can ignore it */
     603           34 :             pfree(srcpath);
     604           34 :             continue;
     605              :         }
     606              : 
     607          138 :         if (srctablespace == src_tsid)
     608          138 :             dsttablespace = dst_tsid;
     609              :         else
     610            0 :             dsttablespace = srctablespace;
     611              : 
     612          138 :         dstpath = GetDatabasePath(dst_dboid, dsttablespace);
     613              : 
     614              :         /*
     615              :          * Copy this subdirectory to the new location
     616              :          *
     617              :          * We don't need to copy subdirectories
     618              :          */
     619          138 :         copydir(srcpath, dstpath, false);
     620              : 
     621              :         /* Record the filesystem change in XLOG */
     622              :         {
     623              :             xl_dbase_create_file_copy_rec xlrec;
     624              : 
     625          138 :             xlrec.db_id = dst_dboid;
     626          138 :             xlrec.tablespace_id = dsttablespace;
     627          138 :             xlrec.src_db_id = src_dboid;
     628          138 :             xlrec.src_tablespace_id = srctablespace;
     629              : 
     630          138 :             XLogBeginInsert();
     631          138 :             XLogRegisterData(&xlrec,
     632              :                              sizeof(xl_dbase_create_file_copy_rec));
     633              : 
     634          138 :             (void) XLogInsert(RM_DBASE_ID,
     635              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
     636              :         }
     637          138 :         pfree(srcpath);
     638          138 :         pfree(dstpath);
     639              :     }
     640          138 :     table_endscan(scan);
     641          138 :     table_close(rel, AccessShareLock);
     642              : 
     643              :     /*
     644              :      * We force a checkpoint before committing.  This effectively means that
     645              :      * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
     646              :      * replayed (at least not in ordinary crash recovery; we still have to
     647              :      * make the XLOG entry for the benefit of PITR operations). This avoids
     648              :      * two nasty scenarios:
     649              :      *
     650              :      * #1: At wal_level=minimal, we don't XLOG the contents of newly created
     651              :      * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
     652              :      * of DBASE_CREATE replay would lose such files created in the new
     653              :      * database between our commit and the next checkpoint.
     654              :      *
     655              :      * #2: Since we have to recopy the source database during DBASE_CREATE
     656              :      * replay, we run the risk of copying changes in it that were committed
     657              :      * after the original CREATE DATABASE command but before the system crash
     658              :      * that led to the replay.  This is at least unexpected and at worst could
     659              :      * lead to inconsistencies, eg duplicate table names.
     660              :      *
     661              :      * (Both of these were real bugs in releases 8.0 through 8.0.3.)
     662              :      *
     663              :      * In PITR replay, the first of these isn't an issue, and the second is
     664              :      * only a risk if the CREATE DATABASE and subsequent template database
     665              :      * change both occur while a base backup is being taken. There doesn't
     666              :      * seem to be much we can do about that except document it as a
     667              :      * limitation.
     668              :      *
     669              :      * In binary upgrade mode, we can skip this checkpoint because neither of
     670              :      * these problems applies: we don't ever replay the WAL generated during
     671              :      * pg_upgrade, and we don't support taking base backups during pg_upgrade
     672              :      * (not to mention that we don't concurrently modify template0, either).
     673              :      *
     674              :      * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
     675              :      * strategy that avoids these problems.
     676              :      */
     677          138 :     if (!IsBinaryUpgrade)
     678          108 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
     679              :                           CHECKPOINT_WAIT);
     680          138 : }
     681              : 
     682              : /*
     683              :  * CREATE DATABASE
     684              :  */
     685              : Oid
     686          409 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
     687              : {
     688              :     Oid         src_dboid;
     689              :     Oid         src_owner;
     690          409 :     int         src_encoding = -1;
     691          409 :     char       *src_collate = NULL;
     692          409 :     char       *src_ctype = NULL;
     693          409 :     char       *src_locale = NULL;
     694          409 :     char       *src_icurules = NULL;
     695          409 :     char        src_locprovider = '\0';
     696          409 :     char       *src_collversion = NULL;
     697              :     bool        src_istemplate;
     698          409 :     bool        src_hasloginevt = false;
     699              :     bool        src_allowconn;
     700          409 :     TransactionId src_frozenxid = InvalidTransactionId;
     701          409 :     MultiXactId src_minmxid = InvalidMultiXactId;
     702              :     Oid         src_deftablespace;
     703              :     volatile Oid dst_deftablespace;
     704              :     Relation    pg_database_rel;
     705              :     HeapTuple   tuple;
     706          409 :     Datum       new_record[Natts_pg_database] = {0};
     707          409 :     bool        new_record_nulls[Natts_pg_database] = {0};
     708          409 :     Oid         dboid = InvalidOid;
     709              :     Oid         datdba;
     710              :     ListCell   *option;
     711          409 :     DefElem    *tablespacenameEl = NULL;
     712          409 :     DefElem    *ownerEl = NULL;
     713          409 :     DefElem    *templateEl = NULL;
     714          409 :     DefElem    *encodingEl = NULL;
     715          409 :     DefElem    *localeEl = NULL;
     716          409 :     DefElem    *builtinlocaleEl = NULL;
     717          409 :     DefElem    *collateEl = NULL;
     718          409 :     DefElem    *ctypeEl = NULL;
     719          409 :     DefElem    *iculocaleEl = NULL;
     720          409 :     DefElem    *icurulesEl = NULL;
     721          409 :     DefElem    *locproviderEl = NULL;
     722          409 :     DefElem    *istemplateEl = NULL;
     723          409 :     DefElem    *allowconnectionsEl = NULL;
     724          409 :     DefElem    *connlimitEl = NULL;
     725          409 :     DefElem    *collversionEl = NULL;
     726          409 :     DefElem    *strategyEl = NULL;
     727          409 :     char       *dbname = stmt->dbname;
     728          409 :     char       *dbowner = NULL;
     729          409 :     const char *dbtemplate = NULL;
     730          409 :     char       *dbcollate = NULL;
     731          409 :     char       *dbctype = NULL;
     732          409 :     const char *dblocale = NULL;
     733          409 :     char       *dbicurules = NULL;
     734          409 :     char        dblocprovider = '\0';
     735              :     char       *canonname;
     736          409 :     int         encoding = -1;
     737          409 :     bool        dbistemplate = false;
     738          409 :     bool        dballowconnections = true;
     739          409 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
     740          409 :     char       *dbcollversion = NULL;
     741              :     int         notherbackends;
     742              :     int         npreparedxacts;
     743          409 :     CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
     744              :     createdb_failure_params fparms;
     745              : 
     746              :     /* Extract options from the statement node tree */
     747         1222 :     foreach(option, stmt->options)
     748              :     {
     749          813 :         DefElem    *defel = (DefElem *) lfirst(option);
     750              : 
     751          813 :         if (strcmp(defel->defname, "tablespace") == 0)
     752              :         {
     753           17 :             if (tablespacenameEl)
     754            0 :                 errorConflictingDefElem(defel, pstate);
     755           17 :             tablespacenameEl = defel;
     756              :         }
     757          796 :         else if (strcmp(defel->defname, "owner") == 0)
     758              :         {
     759            1 :             if (ownerEl)
     760            0 :                 errorConflictingDefElem(defel, pstate);
     761            1 :             ownerEl = defel;
     762              :         }
     763          795 :         else if (strcmp(defel->defname, "template") == 0)
     764              :         {
     765          185 :             if (templateEl)
     766            0 :                 errorConflictingDefElem(defel, pstate);
     767          185 :             templateEl = defel;
     768              :         }
     769          610 :         else if (strcmp(defel->defname, "encoding") == 0)
     770              :         {
     771           53 :             if (encodingEl)
     772            0 :                 errorConflictingDefElem(defel, pstate);
     773           53 :             encodingEl = defel;
     774              :         }
     775          557 :         else if (strcmp(defel->defname, "locale") == 0)
     776              :         {
     777           54 :             if (localeEl)
     778            0 :                 errorConflictingDefElem(defel, pstate);
     779           54 :             localeEl = defel;
     780              :         }
     781          503 :         else if (strcmp(defel->defname, "builtin_locale") == 0)
     782              :         {
     783            8 :             if (builtinlocaleEl)
     784            0 :                 errorConflictingDefElem(defel, pstate);
     785            8 :             builtinlocaleEl = defel;
     786              :         }
     787          495 :         else if (strcmp(defel->defname, "lc_collate") == 0)
     788              :         {
     789            8 :             if (collateEl)
     790            0 :                 errorConflictingDefElem(defel, pstate);
     791            8 :             collateEl = defel;
     792              :         }
     793          487 :         else if (strcmp(defel->defname, "lc_ctype") == 0)
     794              :         {
     795            8 :             if (ctypeEl)
     796            0 :                 errorConflictingDefElem(defel, pstate);
     797            8 :             ctypeEl = defel;
     798              :         }
     799          479 :         else if (strcmp(defel->defname, "icu_locale") == 0)
     800              :         {
     801            5 :             if (iculocaleEl)
     802            0 :                 errorConflictingDefElem(defel, pstate);
     803            5 :             iculocaleEl = defel;
     804              :         }
     805          474 :         else if (strcmp(defel->defname, "icu_rules") == 0)
     806              :         {
     807            1 :             if (icurulesEl)
     808            0 :                 errorConflictingDefElem(defel, pstate);
     809            1 :             icurulesEl = defel;
     810              :         }
     811          473 :         else if (strcmp(defel->defname, "locale_provider") == 0)
     812              :         {
     813           59 :             if (locproviderEl)
     814            0 :                 errorConflictingDefElem(defel, pstate);
     815           59 :             locproviderEl = defel;
     816              :         }
     817          414 :         else if (strcmp(defel->defname, "is_template") == 0)
     818              :         {
     819           51 :             if (istemplateEl)
     820            0 :                 errorConflictingDefElem(defel, pstate);
     821           51 :             istemplateEl = defel;
     822              :         }
     823          363 :         else if (strcmp(defel->defname, "allow_connections") == 0)
     824              :         {
     825           50 :             if (allowconnectionsEl)
     826            0 :                 errorConflictingDefElem(defel, pstate);
     827           50 :             allowconnectionsEl = defel;
     828              :         }
     829          313 :         else if (strcmp(defel->defname, "connection_limit") == 0)
     830              :         {
     831            0 :             if (connlimitEl)
     832            0 :                 errorConflictingDefElem(defel, pstate);
     833            0 :             connlimitEl = defel;
     834              :         }
     835          313 :         else if (strcmp(defel->defname, "collation_version") == 0)
     836              :         {
     837           30 :             if (collversionEl)
     838            0 :                 errorConflictingDefElem(defel, pstate);
     839           30 :             collversionEl = defel;
     840              :         }
     841          283 :         else if (strcmp(defel->defname, "location") == 0)
     842              :         {
     843            0 :             ereport(WARNING,
     844              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     845              :                      errmsg("LOCATION is not supported anymore"),
     846              :                      errhint("Consider using tablespaces instead."),
     847              :                      parser_errposition(pstate, defel->location)));
     848              :         }
     849          283 :         else if (strcmp(defel->defname, "oid") == 0)
     850              :         {
     851          133 :             dboid = defGetObjectId(defel);
     852              : 
     853              :             /*
     854              :              * We don't normally permit new databases to be created with
     855              :              * system-assigned OIDs. pg_upgrade tries to preserve database
     856              :              * OIDs, so we can't allow any database to be created with an OID
     857              :              * that might be in use in a freshly-initialized cluster created
     858              :              * by some future version. We assume all such OIDs will be from
     859              :              * the system-managed OID range.
     860              :              *
     861              :              * As an exception, however, we permit any OID to be assigned when
     862              :              * allow_system_table_mods=on (so that initdb can assign system
     863              :              * OIDs to template0 and postgres) or when performing a binary
     864              :              * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
     865              :              * in the source cluster).
     866              :              */
     867          133 :             if (dboid < FirstNormalObjectId &&
     868          116 :                 !allowSystemTableMods && !IsBinaryUpgrade)
     869            0 :                 ereport(ERROR,
     870              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
     871              :                         errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
     872              :         }
     873          150 :         else if (strcmp(defel->defname, "strategy") == 0)
     874              :         {
     875          150 :             if (strategyEl)
     876            0 :                 errorConflictingDefElem(defel, pstate);
     877          150 :             strategyEl = defel;
     878              :         }
     879              :         else
     880            0 :             ereport(ERROR,
     881              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     882              :                      errmsg("option \"%s\" not recognized", defel->defname),
     883              :                      parser_errposition(pstate, defel->location)));
     884              :     }
     885              : 
     886          409 :     if (ownerEl && ownerEl->arg)
     887            1 :         dbowner = defGetString(ownerEl);
     888          409 :     if (templateEl && templateEl->arg)
     889          185 :         dbtemplate = defGetString(templateEl);
     890          409 :     if (encodingEl && encodingEl->arg)
     891              :     {
     892              :         const char *encoding_name;
     893              : 
     894           53 :         if (IsA(encodingEl->arg, Integer))
     895              :         {
     896            0 :             encoding = defGetInt32(encodingEl);
     897            0 :             encoding_name = pg_encoding_to_char(encoding);
     898            0 :             if (strcmp(encoding_name, "") == 0 ||
     899            0 :                 pg_valid_server_encoding(encoding_name) < 0)
     900            0 :                 ereport(ERROR,
     901              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     902              :                          errmsg("%d is not a valid encoding code",
     903              :                                 encoding),
     904              :                          parser_errposition(pstate, encodingEl->location)));
     905              :         }
     906              :         else
     907              :         {
     908           53 :             encoding_name = defGetString(encodingEl);
     909           53 :             encoding = pg_valid_server_encoding(encoding_name);
     910           53 :             if (encoding < 0)
     911            0 :                 ereport(ERROR,
     912              :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     913              :                          errmsg("%s is not a valid encoding name",
     914              :                                 encoding_name),
     915              :                          parser_errposition(pstate, encodingEl->location)));
     916              :         }
     917              :     }
     918          409 :     if (localeEl && localeEl->arg)
     919              :     {
     920           54 :         dbcollate = defGetString(localeEl);
     921           54 :         dbctype = defGetString(localeEl);
     922           54 :         dblocale = defGetString(localeEl);
     923              :     }
     924          409 :     if (builtinlocaleEl && builtinlocaleEl->arg)
     925            8 :         dblocale = defGetString(builtinlocaleEl);
     926          409 :     if (collateEl && collateEl->arg)
     927            8 :         dbcollate = defGetString(collateEl);
     928          409 :     if (ctypeEl && ctypeEl->arg)
     929            8 :         dbctype = defGetString(ctypeEl);
     930          409 :     if (iculocaleEl && iculocaleEl->arg)
     931            5 :         dblocale = defGetString(iculocaleEl);
     932          409 :     if (icurulesEl && icurulesEl->arg)
     933            1 :         dbicurules = defGetString(icurulesEl);
     934          409 :     if (locproviderEl && locproviderEl->arg)
     935              :     {
     936           59 :         char       *locproviderstr = defGetString(locproviderEl);
     937              : 
     938           59 :         if (pg_strcasecmp(locproviderstr, "builtin") == 0)
     939           17 :             dblocprovider = COLLPROVIDER_BUILTIN;
     940           42 :         else if (pg_strcasecmp(locproviderstr, "icu") == 0)
     941            7 :             dblocprovider = COLLPROVIDER_ICU;
     942           35 :         else if (pg_strcasecmp(locproviderstr, "libc") == 0)
     943           34 :             dblocprovider = COLLPROVIDER_LIBC;
     944              :         else
     945            1 :             ereport(ERROR,
     946              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     947              :                      errmsg("unrecognized locale provider: %s",
     948              :                             locproviderstr)));
     949              :     }
     950          408 :     if (istemplateEl && istemplateEl->arg)
     951           51 :         dbistemplate = defGetBoolean(istemplateEl);
     952          408 :     if (allowconnectionsEl && allowconnectionsEl->arg)
     953           50 :         dballowconnections = defGetBoolean(allowconnectionsEl);
     954          408 :     if (connlimitEl && connlimitEl->arg)
     955              :     {
     956            0 :         dbconnlimit = defGetInt32(connlimitEl);
     957            0 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
     958            0 :             ereport(ERROR,
     959              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     960              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
     961              :     }
     962          408 :     if (collversionEl)
     963           30 :         dbcollversion = defGetString(collversionEl);
     964              : 
     965              :     /* obtain OID of proposed owner */
     966          408 :     if (dbowner)
     967            1 :         datdba = get_role_oid(dbowner, false);
     968              :     else
     969          407 :         datdba = GetUserId();
     970              : 
     971              :     /*
     972              :      * To create a database, must have createdb privilege and must be able to
     973              :      * become the target role (this does not imply that the target role itself
     974              :      * must have createdb privilege).  The latter provision guards against
     975              :      * "giveaway" attacks.  Note that a superuser will always have both of
     976              :      * these privileges a fortiori.
     977              :      */
     978          408 :     if (!have_createdb_privilege())
     979            3 :         ereport(ERROR,
     980              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     981              :                  errmsg("permission denied to create database")));
     982              : 
     983          405 :     check_can_set_role(GetUserId(), datdba);
     984              : 
     985              :     /*
     986              :      * Lookup database (template) to be cloned, and obtain share lock on it.
     987              :      * ShareLock allows two CREATE DATABASEs to work from the same template
     988              :      * concurrently, while ensuring no one is busy dropping it in parallel
     989              :      * (which would be Very Bad since we'd likely get an incomplete copy
     990              :      * without knowing it).  This also prevents any new connections from being
     991              :      * made to the source until we finish copying it, so we can be sure it
     992              :      * won't change underneath us.
     993              :      */
     994          405 :     if (!dbtemplate)
     995          221 :         dbtemplate = "template1"; /* Default template database name */
     996              : 
     997          405 :     if (!get_db_info(dbtemplate, ShareLock,
     998              :                      &src_dboid, &src_owner, &src_encoding,
     999              :                      &src_istemplate, &src_allowconn, &src_hasloginevt,
    1000              :                      &src_frozenxid, &src_minmxid, &src_deftablespace,
    1001              :                      &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
    1002              :                      &src_collversion))
    1003            0 :         ereport(ERROR,
    1004              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1005              :                  errmsg("template database \"%s\" does not exist",
    1006              :                         dbtemplate)));
    1007              : 
    1008              :     /*
    1009              :      * If the source database was in the process of being dropped, we can't
    1010              :      * use it as a template.
    1011              :      */
    1012          405 :     if (database_is_invalid_oid(src_dboid))
    1013            1 :         ereport(ERROR,
    1014              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1015              :                 errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
    1016              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    1017              : 
    1018              :     /*
    1019              :      * Permission check: to copy a DB that's not marked datistemplate, you
    1020              :      * must be superuser or the owner thereof.
    1021              :      */
    1022          404 :     if (!src_istemplate)
    1023              :     {
    1024           14 :         if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
    1025            0 :             ereport(ERROR,
    1026              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1027              :                      errmsg("permission denied to copy database \"%s\"",
    1028              :                             dbtemplate)));
    1029              :     }
    1030              : 
    1031              :     /* Validate the database creation strategy. */
    1032          404 :     if (strategyEl && strategyEl->arg)
    1033              :     {
    1034              :         char       *strategy;
    1035              : 
    1036          150 :         strategy = defGetString(strategyEl);
    1037          150 :         if (pg_strcasecmp(strategy, "wal_log") == 0)
    1038           11 :             dbstrategy = CREATEDB_WAL_LOG;
    1039          139 :         else if (pg_strcasecmp(strategy, "file_copy") == 0)
    1040          138 :             dbstrategy = CREATEDB_FILE_COPY;
    1041              :         else
    1042            1 :             ereport(ERROR,
    1043              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1044              :                      errmsg("invalid create database strategy \"%s\"", strategy),
    1045              :                      errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
    1046              :     }
    1047              : 
    1048              :     /* If encoding or locales are defaulted, use source's setting */
    1049          403 :     if (encoding < 0)
    1050          350 :         encoding = src_encoding;
    1051          403 :     if (dbcollate == NULL)
    1052          343 :         dbcollate = src_collate;
    1053          403 :     if (dbctype == NULL)
    1054          343 :         dbctype = src_ctype;
    1055          403 :     if (dblocprovider == '\0')
    1056          345 :         dblocprovider = src_locprovider;
    1057          403 :     if (dblocale == NULL && dblocprovider == src_locprovider)
    1058          341 :         dblocale = src_locale;
    1059          403 :     if (dbicurules == NULL)
    1060          402 :         dbicurules = src_icurules;
    1061              : 
    1062              :     /* Some encodings are client only */
    1063          403 :     if (!PG_VALID_BE_ENCODING(encoding))
    1064            0 :         ereport(ERROR,
    1065              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1066              :                  errmsg("invalid server encoding %d", encoding)));
    1067              : 
    1068              :     /* Check that the chosen locales are valid, and get canonical spellings */
    1069          403 :     if (!check_locale(LC_COLLATE, dbcollate, &canonname))
    1070              :     {
    1071            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1072            0 :             ereport(ERROR,
    1073              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1074              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1075              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1076            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1077            0 :             ereport(ERROR,
    1078              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1079              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
    1080              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1081              :         else
    1082            1 :             ereport(ERROR,
    1083              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1084              :                      errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
    1085              :     }
    1086          402 :     dbcollate = canonname;
    1087          402 :     if (!check_locale(LC_CTYPE, dbctype, &canonname))
    1088              :     {
    1089            1 :         if (dblocprovider == COLLPROVIDER_BUILTIN)
    1090            0 :             ereport(ERROR,
    1091              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1092              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1093              :                      errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
    1094            1 :         else if (dblocprovider == COLLPROVIDER_ICU)
    1095            0 :             ereport(ERROR,
    1096              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1097              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
    1098              :                      errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
    1099              :         else
    1100            1 :             ereport(ERROR,
    1101              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1102              :                      errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
    1103              :     }
    1104              : 
    1105          401 :     dbctype = canonname;
    1106              : 
    1107          401 :     check_encoding_locale_matches(encoding, dbcollate, dbctype);
    1108              : 
    1109              :     /* validate provider-specific parameters */
    1110          401 :     if (dblocprovider != COLLPROVIDER_BUILTIN)
    1111              :     {
    1112          370 :         if (builtinlocaleEl)
    1113            0 :             ereport(ERROR,
    1114              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1115              :                      errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
    1116              :     }
    1117              : 
    1118          401 :     if (dblocprovider != COLLPROVIDER_ICU)
    1119              :     {
    1120          387 :         if (iculocaleEl)
    1121            1 :             ereport(ERROR,
    1122              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1123              :                      errmsg("ICU locale cannot be specified unless locale provider is ICU")));
    1124              : 
    1125          386 :         if (dbicurules)
    1126            1 :             ereport(ERROR,
    1127              :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1128              :                      errmsg("ICU rules cannot be specified unless locale provider is ICU")));
    1129              :     }
    1130              : 
    1131              :     /* validate and canonicalize locale for the provider */
    1132          399 :     if (dblocprovider == COLLPROVIDER_BUILTIN)
    1133              :     {
    1134              :         /*
    1135              :          * This would happen if template0 uses the libc provider but the new
    1136              :          * database uses builtin.
    1137              :          */
    1138           29 :         if (!dblocale)
    1139            1 :             ereport(ERROR,
    1140              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1141              :                      errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
    1142              : 
    1143           28 :         dblocale = builtin_validate_locale(encoding, dblocale);
    1144              :     }
    1145          370 :     else if (dblocprovider == COLLPROVIDER_ICU)
    1146              :     {
    1147           14 :         if (!(is_encoding_supported_by_icu(encoding)))
    1148            1 :             ereport(ERROR,
    1149              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1150              :                      errmsg("encoding \"%s\" is not supported with ICU provider",
    1151              :                             pg_encoding_to_char(encoding))));
    1152              : 
    1153              :         /*
    1154              :          * This would happen if template0 uses the libc provider but the new
    1155              :          * database uses icu.
    1156              :          */
    1157           13 :         if (!dblocale)
    1158            1 :             ereport(ERROR,
    1159              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1160              :                      errmsg("LOCALE or ICU_LOCALE must be specified")));
    1161              : 
    1162              :         /*
    1163              :          * During binary upgrade, or when the locale came from the template
    1164              :          * database, preserve locale string. Otherwise, canonicalize to a
    1165              :          * language tag.
    1166              :          */
    1167           12 :         if (!IsBinaryUpgrade && dblocale != src_locale)
    1168              :         {
    1169            6 :             char       *langtag = icu_language_tag(dblocale,
    1170              :                                                    icu_validation_level);
    1171              : 
    1172            6 :             if (langtag && strcmp(dblocale, langtag) != 0)
    1173              :             {
    1174            2 :                 ereport(NOTICE,
    1175              :                         (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
    1176              :                                 langtag, dblocale)));
    1177              : 
    1178            2 :                 dblocale = langtag;
    1179              :             }
    1180              :         }
    1181              : 
    1182           12 :         icu_validate_locale(dblocale);
    1183              :     }
    1184              : 
    1185              :     /* for libc, locale comes from datcollate and datctype */
    1186          394 :     if (dblocprovider == COLLPROVIDER_LIBC)
    1187          356 :         dblocale = NULL;
    1188              : 
    1189              :     /*
    1190              :      * Check that the new encoding and locale settings match the source
    1191              :      * database.  We insist on this because we simply copy the source data ---
    1192              :      * any non-ASCII data would be wrongly encoded, and any indexes sorted
    1193              :      * according to the source locale would be wrong.
    1194              :      *
    1195              :      * However, we assume that template0 doesn't contain any non-ASCII data
    1196              :      * nor any indexes that depend on collation or ctype, so template0 can be
    1197              :      * used as template for creating a database with any encoding or locale.
    1198              :      */
    1199          394 :     if (strcmp(dbtemplate, "template0") != 0)
    1200              :     {
    1201          236 :         if (encoding != src_encoding)
    1202            0 :             ereport(ERROR,
    1203              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1204              :                      errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
    1205              :                             pg_encoding_to_char(encoding),
    1206              :                             pg_encoding_to_char(src_encoding)),
    1207              :                      errhint("Use the same encoding as in the template database, or use template0 as template.")));
    1208              : 
    1209          236 :         if (strcmp(dbcollate, src_collate) != 0)
    1210            1 :             ereport(ERROR,
    1211              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1212              :                      errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
    1213              :                             dbcollate, src_collate),
    1214              :                      errhint("Use the same collation as in the template database, or use template0 as template.")));
    1215              : 
    1216          235 :         if (strcmp(dbctype, src_ctype) != 0)
    1217            0 :             ereport(ERROR,
    1218              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1219              :                      errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
    1220              :                             dbctype, src_ctype),
    1221              :                      errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
    1222              : 
    1223          235 :         if (dblocprovider != src_locprovider)
    1224            0 :             ereport(ERROR,
    1225              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1226              :                      errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
    1227              :                             collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
    1228              :                      errhint("Use the same locale provider as in the template database, or use template0 as template.")));
    1229              : 
    1230          235 :         if (dblocprovider == COLLPROVIDER_ICU)
    1231              :         {
    1232              :             char       *val1;
    1233              :             char       *val2;
    1234              : 
    1235              :             Assert(dblocale);
    1236              :             Assert(src_locale);
    1237            6 :             if (strcmp(dblocale, src_locale) != 0)
    1238            0 :                 ereport(ERROR,
    1239              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1240              :                          errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
    1241              :                                 dblocale, src_locale),
    1242              :                          errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
    1243              : 
    1244            6 :             val1 = dbicurules;
    1245            6 :             if (!val1)
    1246            6 :                 val1 = "";
    1247            6 :             val2 = src_icurules;
    1248            6 :             if (!val2)
    1249            6 :                 val2 = "";
    1250            6 :             if (strcmp(val1, val2) != 0)
    1251            0 :                 ereport(ERROR,
    1252              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1253              :                          errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
    1254              :                                 val1, val2),
    1255              :                          errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
    1256              :         }
    1257              :     }
    1258              : 
    1259              :     /*
    1260              :      * If we got a collation version for the template database, check that it
    1261              :      * matches the actual OS collation version.  Otherwise error; the user
    1262              :      * needs to fix the template database first.  Don't complain if a
    1263              :      * collation version was specified explicitly as a statement option; that
    1264              :      * is used by pg_upgrade to reproduce the old state exactly.
    1265              :      *
    1266              :      * (If the template database has no collation version, then either the
    1267              :      * platform/provider does not support collation versioning, or it's
    1268              :      * template0, for which we stipulate that it does not contain
    1269              :      * collation-using objects.)
    1270              :      */
    1271          393 :     if (src_collversion && !collversionEl)
    1272              :     {
    1273              :         char       *actual_versionstr;
    1274              :         const char *locale;
    1275              : 
    1276          155 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1277          144 :             locale = dbcollate;
    1278              :         else
    1279           11 :             locale = dblocale;
    1280              : 
    1281          155 :         actual_versionstr = get_collation_actual_version(dblocprovider, locale);
    1282          155 :         if (!actual_versionstr)
    1283            0 :             ereport(ERROR,
    1284              :                     (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
    1285              :                             dbtemplate)));
    1286              : 
    1287          155 :         if (strcmp(actual_versionstr, src_collversion) != 0)
    1288            0 :             ereport(ERROR,
    1289              :                     (errmsg("template database \"%s\" has a collation version mismatch",
    1290              :                             dbtemplate),
    1291              :                      errdetail("The template database was created using collation version %s, "
    1292              :                                "but the operating system provides version %s.",
    1293              :                                src_collversion, actual_versionstr),
    1294              :                      errhint("Rebuild all objects in the template database that use the default collation and run "
    1295              :                              "ALTER DATABASE %s REFRESH COLLATION VERSION, "
    1296              :                              "or build PostgreSQL with the right library version.",
    1297              :                              quote_identifier(dbtemplate))));
    1298              :     }
    1299              : 
    1300          393 :     if (dbcollversion == NULL)
    1301          363 :         dbcollversion = src_collversion;
    1302              : 
    1303              :     /*
    1304              :      * Normally, we copy the collation version from the template database.
    1305              :      * This last resort only applies if the template database does not have a
    1306              :      * collation version, which is normally only the case for template0.
    1307              :      */
    1308          393 :     if (dbcollversion == NULL)
    1309              :     {
    1310              :         const char *locale;
    1311              : 
    1312          208 :         if (dblocprovider == COLLPROVIDER_LIBC)
    1313          188 :             locale = dbcollate;
    1314              :         else
    1315           20 :             locale = dblocale;
    1316              : 
    1317          208 :         dbcollversion = get_collation_actual_version(dblocprovider, locale);
    1318              :     }
    1319              : 
    1320              :     /* Resolve default tablespace for new database */
    1321          393 :     if (tablespacenameEl && tablespacenameEl->arg)
    1322           17 :     {
    1323              :         char       *tablespacename;
    1324              :         AclResult   aclresult;
    1325              : 
    1326           17 :         tablespacename = defGetString(tablespacenameEl);
    1327           17 :         dst_deftablespace = get_tablespace_oid(tablespacename, false);
    1328              :         /* check permissions */
    1329           17 :         aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
    1330              :                                     ACL_CREATE);
    1331           17 :         if (aclresult != ACLCHECK_OK)
    1332            0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1333              :                            tablespacename);
    1334              : 
    1335              :         /* pg_global must never be the default tablespace */
    1336           17 :         if (dst_deftablespace == GLOBALTABLESPACE_OID)
    1337            0 :             ereport(ERROR,
    1338              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1339              :                      errmsg("pg_global cannot be used as default tablespace")));
    1340              : 
    1341              :         /*
    1342              :          * If we are trying to change the default tablespace of the template,
    1343              :          * we require that the template not have any files in the new default
    1344              :          * tablespace.  This is necessary because otherwise the copied
    1345              :          * database would contain pg_class rows that refer to its default
    1346              :          * tablespace both explicitly (by OID) and implicitly (as zero), which
    1347              :          * would cause problems.  For example another CREATE DATABASE using
    1348              :          * the copied database as template, and trying to change its default
    1349              :          * tablespace again, would yield outright incorrect results (it would
    1350              :          * improperly move tables to the new default tablespace that should
    1351              :          * stay in the same tablespace).
    1352              :          */
    1353           17 :         if (dst_deftablespace != src_deftablespace)
    1354              :         {
    1355              :             char       *srcpath;
    1356              :             struct stat st;
    1357              : 
    1358           17 :             srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
    1359              : 
    1360           17 :             if (stat(srcpath, &st) == 0 &&
    1361            0 :                 S_ISDIR(st.st_mode) &&
    1362            0 :                 !directory_is_empty(srcpath))
    1363            0 :                 ereport(ERROR,
    1364              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1365              :                          errmsg("cannot assign new default tablespace \"%s\"",
    1366              :                                 tablespacename),
    1367              :                          errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
    1368              :                                    dbtemplate)));
    1369           17 :             pfree(srcpath);
    1370              :         }
    1371              :     }
    1372              :     else
    1373              :     {
    1374              :         /* Use template database's default tablespace */
    1375          376 :         dst_deftablespace = src_deftablespace;
    1376              :         /* Note there is no additional permission check in this path */
    1377              :     }
    1378              : 
    1379              :     /*
    1380              :      * If built with appropriate switch, whine when regression-testing
    1381              :      * conventions for database names are violated.  But don't complain during
    1382              :      * initdb.
    1383              :      */
    1384              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1385              :     if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
    1386              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1387              : #endif
    1388              : 
    1389              :     /*
    1390              :      * Check for db name conflict.  This is just to give a more friendly error
    1391              :      * message than "unique index violation".  There's a race condition but
    1392              :      * we're willing to accept the less friendly message in that case.
    1393              :      */
    1394          393 :     if (OidIsValid(get_database_oid(dbname, true)))
    1395            1 :         ereport(ERROR,
    1396              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1397              :                  errmsg("database \"%s\" already exists", dbname)));
    1398              : 
    1399              :     /*
    1400              :      * The source DB can't have any active backends, except this one
    1401              :      * (exception is to allow CREATE DB while connected to template1).
    1402              :      * Otherwise we might copy inconsistent data.
    1403              :      *
    1404              :      * This should be last among the basic error checks, because it involves
    1405              :      * potential waiting; we may as well throw an error first if we're gonna
    1406              :      * throw one.
    1407              :      */
    1408          392 :     if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
    1409            1 :         ereport(ERROR,
    1410              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1411              :                  errmsg("source database \"%s\" is being accessed by other users",
    1412              :                         dbtemplate),
    1413              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1414              : 
    1415              :     /*
    1416              :      * Select an OID for the new database, checking that it doesn't have a
    1417              :      * filename conflict with anything already existing in the tablespace
    1418              :      * directories.
    1419              :      */
    1420          391 :     pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1421              : 
    1422              :     /*
    1423              :      * If database OID is configured, check if the OID is already in use or
    1424              :      * data directory already exists.
    1425              :      */
    1426          391 :     if (OidIsValid(dboid))
    1427              :     {
    1428          133 :         char       *existing_dbname = get_database_name(dboid);
    1429              : 
    1430          133 :         if (existing_dbname != NULL)
    1431            0 :             ereport(ERROR,
    1432              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1433              :                     errmsg("database OID %u is already in use by database \"%s\"",
    1434              :                            dboid, existing_dbname));
    1435              : 
    1436          133 :         if (check_db_file_conflict(dboid))
    1437            0 :             ereport(ERROR,
    1438              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1439              :                     errmsg("data directory with the specified OID %u already exists", dboid));
    1440              :     }
    1441              :     else
    1442              :     {
    1443              :         /* Select an OID for the new database if is not explicitly configured. */
    1444              :         do
    1445              :         {
    1446          258 :             dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
    1447              :                                        Anum_pg_database_oid);
    1448          258 :         } while (check_db_file_conflict(dboid));
    1449              :     }
    1450              : 
    1451              :     /*
    1452              :      * Insert a new tuple into pg_database.  This establishes our ownership of
    1453              :      * the new database name (anyone else trying to insert the same name will
    1454              :      * block on the unique index, and fail after we commit).
    1455              :      */
    1456              : 
    1457              :     Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
    1458              :            (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
    1459              : 
    1460              :     /* Form tuple */
    1461          391 :     new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
    1462          391 :     new_record[Anum_pg_database_datname - 1] =
    1463          391 :         DirectFunctionCall1(namein, CStringGetDatum(dbname));
    1464          391 :     new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
    1465          391 :     new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
    1466          391 :     new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
    1467          391 :     new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    1468          391 :     new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    1469          391 :     new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
    1470          391 :     new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    1471          391 :     new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
    1472          391 :     new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
    1473          391 :     new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
    1474          391 :     new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
    1475          391 :     new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
    1476          391 :     if (dblocale)
    1477           37 :         new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
    1478              :     else
    1479          354 :         new_record_nulls[Anum_pg_database_datlocale - 1] = true;
    1480          391 :     if (dbicurules)
    1481            0 :         new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
    1482              :     else
    1483          391 :         new_record_nulls[Anum_pg_database_daticurules - 1] = true;
    1484          391 :     if (dbcollversion)
    1485          333 :         new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
    1486              :     else
    1487           58 :         new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
    1488              : 
    1489              :     /*
    1490              :      * We deliberately set datacl to default (NULL), rather than copying it
    1491              :      * from the template database.  Copying it would be a bad idea when the
    1492              :      * owner is not the same as the template's owner.
    1493              :      */
    1494          391 :     new_record_nulls[Anum_pg_database_datacl - 1] = true;
    1495              : 
    1496          391 :     tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
    1497              :                             new_record, new_record_nulls);
    1498              : 
    1499          391 :     CatalogTupleInsert(pg_database_rel, tuple);
    1500              : 
    1501              :     /*
    1502              :      * Now generate additional catalog entries associated with the new DB
    1503              :      */
    1504              : 
    1505              :     /* Register owner dependency */
    1506          391 :     recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
    1507              : 
    1508              :     /* Create pg_shdepend entries for objects within database */
    1509          391 :     copyTemplateDependencies(src_dboid, dboid);
    1510              : 
    1511              :     /* Post creation hook for new database */
    1512          391 :     InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
    1513              : 
    1514              :     /*
    1515              :      * If we're going to be reading data for the to-be-created database into
    1516              :      * shared_buffers, take a lock on it. Nobody should know that this
    1517              :      * database exists yet, but it's good to maintain the invariant that an
    1518              :      * AccessExclusiveLock on the database is sufficient to drop all of its
    1519              :      * buffers without worrying about more being read later.
    1520              :      *
    1521              :      * Note that we need to do this before entering the
    1522              :      * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
    1523              :      * expects this lock to be held already.
    1524              :      */
    1525          391 :     if (dbstrategy == CREATEDB_WAL_LOG)
    1526          253 :         LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
    1527              : 
    1528              :     /*
    1529              :      * Once we start copying subdirectories, we need to be able to clean 'em
    1530              :      * up if we fail.  Use an ENSURE block to make sure this happens.  (This
    1531              :      * is not a 100% solution, because of the possibility of failure during
    1532              :      * transaction commit after we leave this routine, but it should handle
    1533              :      * most scenarios.)
    1534              :      */
    1535          391 :     fparms.src_dboid = src_dboid;
    1536          391 :     fparms.dest_dboid = dboid;
    1537          391 :     fparms.strategy = dbstrategy;
    1538              : 
    1539          391 :     PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1540              :                             PointerGetDatum(&fparms));
    1541              :     {
    1542              :         /*
    1543              :          * If the user has asked to create a database with WAL_LOG strategy
    1544              :          * then call CreateDatabaseUsingWalLog, which will copy the database
    1545              :          * at the block level and it will WAL log each copied block.
    1546              :          * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
    1547              :          * database file by file.
    1548              :          */
    1549          391 :         if (dbstrategy == CREATEDB_WAL_LOG)
    1550          253 :             CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
    1551              :                                       dst_deftablespace);
    1552              :         else
    1553          138 :             CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
    1554              :                                         dst_deftablespace);
    1555              : 
    1556              :         /*
    1557              :          * Close pg_database, but keep lock till commit.
    1558              :          */
    1559          389 :         table_close(pg_database_rel, NoLock);
    1560              : 
    1561              :         /*
    1562              :          * Force synchronous commit, thus minimizing the window between
    1563              :          * creation of the database files and committal of the transaction. If
    1564              :          * we crash before committing, we'll have a DB that's taking up disk
    1565              :          * space but is not in pg_database, which is not good.
    1566              :          */
    1567          389 :         ForceSyncCommit();
    1568              :     }
    1569          391 :     PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1570              :                                 PointerGetDatum(&fparms));
    1571              : 
    1572          389 :     return dboid;
    1573              : }
    1574              : 
    1575              : /*
    1576              :  * Check whether chosen encoding matches chosen locale settings.  This
    1577              :  * restriction is necessary because libc's locale-specific code usually
    1578              :  * fails when presented with data in an encoding it's not expecting. We
    1579              :  * allow mismatch in four cases:
    1580              :  *
    1581              :  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
    1582              :  * which works with any encoding.
    1583              :  *
    1584              :  * 2. locale encoding = -1, which means that we couldn't determine the
    1585              :  * locale's encoding and have to trust the user to get it right.
    1586              :  *
    1587              :  * 3. selected encoding is UTF8 and platform is win32. This is because
    1588              :  * UTF8 is a pseudo codepage that is supported in all locales since it's
    1589              :  * converted to UTF16 before being used.
    1590              :  *
    1591              :  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
    1592              :  * is risky but we have historically allowed it --- notably, the
    1593              :  * regression tests require it.
    1594              :  *
    1595              :  * Note: if you change this policy, fix initdb to match.
    1596              :  */
    1597              : void
    1598          415 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
    1599              : {
    1600          415 :     int         ctype_encoding = pg_get_encoding_from_locale(ctype, true);
    1601          415 :     int         collate_encoding = pg_get_encoding_from_locale(collate, true);
    1602              : 
    1603          419 :     if (!(ctype_encoding == encoding ||
    1604            4 :           ctype_encoding == PG_SQL_ASCII ||
    1605              :           ctype_encoding == -1 ||
    1606              : #ifdef WIN32
    1607              :           encoding == PG_UTF8 ||
    1608              : #endif
    1609            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1610            0 :         ereport(ERROR,
    1611              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1612              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1613              :                         pg_encoding_to_char(encoding),
    1614              :                         ctype),
    1615              :                  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
    1616              :                            pg_encoding_to_char(ctype_encoding))));
    1617              : 
    1618          419 :     if (!(collate_encoding == encoding ||
    1619            4 :           collate_encoding == PG_SQL_ASCII ||
    1620              :           collate_encoding == -1 ||
    1621              : #ifdef WIN32
    1622              :           encoding == PG_UTF8 ||
    1623              : #endif
    1624            4 :           (encoding == PG_SQL_ASCII && superuser())))
    1625            0 :         ereport(ERROR,
    1626              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1627              :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1628              :                         pg_encoding_to_char(encoding),
    1629              :                         collate),
    1630              :                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
    1631              :                            pg_encoding_to_char(collate_encoding))));
    1632          415 : }
    1633              : 
    1634              : /* Error cleanup callback for createdb */
    1635              : static void
    1636            2 : createdb_failure_callback(int code, Datum arg)
    1637              : {
    1638            2 :     createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
    1639              : 
    1640              :     /*
    1641              :      * If we were copying database at block levels then drop pages for the
    1642              :      * destination database that are in the shared buffer cache.  And tell
    1643              :      * checkpointer to forget any pending fsync and unlink requests for files
    1644              :      * in the database.  The reasoning behind doing this is same as explained
    1645              :      * in dropdb function.  But unlike dropdb we don't need to call
    1646              :      * pgstat_drop_database because this database is still not created so
    1647              :      * there should not be any stat for this.
    1648              :      */
    1649            2 :     if (fparms->strategy == CREATEDB_WAL_LOG)
    1650              :     {
    1651            2 :         DropDatabaseBuffers(fparms->dest_dboid);
    1652            2 :         ForgetDatabaseSyncRequests(fparms->dest_dboid);
    1653              : 
    1654              :         /* Release lock on the target database. */
    1655            2 :         UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
    1656              :                            AccessShareLock);
    1657              :     }
    1658              : 
    1659              :     /*
    1660              :      * Release lock on source database before doing recursive remove. This is
    1661              :      * not essential but it seems desirable to release the lock as soon as
    1662              :      * possible.
    1663              :      */
    1664            2 :     UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
    1665              : 
    1666              :     /* Throw away any successfully copied subdirectories */
    1667            2 :     remove_dbtablespaces(fparms->dest_dboid);
    1668            2 : }
    1669              : 
    1670              : 
    1671              : /*
    1672              :  * DROP DATABASE
    1673              :  */
    1674              : void
    1675           64 : dropdb(const char *dbname, bool missing_ok, bool force)
    1676              : {
    1677              :     Oid         db_id;
    1678              :     bool        db_istemplate;
    1679              :     Relation    pgdbrel;
    1680              :     HeapTuple   tup;
    1681              :     ScanKeyData scankey;
    1682              :     void       *inplace_state;
    1683              :     Form_pg_database datform;
    1684              :     int         notherbackends;
    1685              :     int         npreparedxacts;
    1686              :     int         nslots,
    1687              :                 nslots_active;
    1688              :     int         nsubscriptions;
    1689              : 
    1690              :     /*
    1691              :      * Look up the target database's OID, and get exclusive lock on it. We
    1692              :      * need this to ensure that no new backend starts up in the target
    1693              :      * database while we are deleting it (see postinit.c), and that no one is
    1694              :      * using it as a CREATE DATABASE template or trying to delete it for
    1695              :      * themselves.
    1696              :      */
    1697           64 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1698              : 
    1699           64 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1700              :                      &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1701              :     {
    1702           16 :         if (!missing_ok)
    1703              :         {
    1704            8 :             ereport(ERROR,
    1705              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    1706              :                      errmsg("database \"%s\" does not exist", dbname)));
    1707              :         }
    1708              :         else
    1709              :         {
    1710              :             /* Close pg_database, release the lock, since we changed nothing */
    1711            8 :             table_close(pgdbrel, RowExclusiveLock);
    1712            8 :             ereport(NOTICE,
    1713              :                     (errmsg("database \"%s\" does not exist, skipping",
    1714              :                             dbname)));
    1715            8 :             return;
    1716              :         }
    1717              :     }
    1718              : 
    1719              :     /*
    1720              :      * Permission checks
    1721              :      */
    1722           48 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1723            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1724              :                        dbname);
    1725              : 
    1726              :     /* DROP hook for the database being removed */
    1727           48 :     InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
    1728              : 
    1729              :     /*
    1730              :      * Disallow dropping a DB that is marked istemplate.  This is just to
    1731              :      * prevent people from accidentally dropping template0 or template1; they
    1732              :      * can do so if they're really determined ...
    1733              :      */
    1734           48 :     if (db_istemplate)
    1735            0 :         ereport(ERROR,
    1736              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1737              :                  errmsg("cannot drop a template database")));
    1738              : 
    1739              :     /* Obviously can't drop my own database */
    1740           48 :     if (db_id == MyDatabaseId)
    1741            0 :         ereport(ERROR,
    1742              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1743              :                  errmsg("cannot drop the currently open database")));
    1744              : 
    1745              :     /*
    1746              :      * Check whether there are active logical slots that refer to the
    1747              :      * to-be-dropped database. The database lock we are holding prevents the
    1748              :      * creation of new slots using the database or existing slots becoming
    1749              :      * active.
    1750              :      */
    1751           48 :     (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
    1752           48 :     if (nslots_active)
    1753              :     {
    1754            1 :         ereport(ERROR,
    1755              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1756              :                  errmsg("database \"%s\" is used by an active logical replication slot",
    1757              :                         dbname),
    1758              :                  errdetail_plural("There is %d active slot.",
    1759              :                                   "There are %d active slots.",
    1760              :                                   nslots_active, nslots_active)));
    1761              :     }
    1762              : 
    1763              :     /*
    1764              :      * Check if there are subscriptions defined in the target database.
    1765              :      *
    1766              :      * We can't drop them automatically because they might be holding
    1767              :      * resources in other databases/instances.
    1768              :      */
    1769           47 :     if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
    1770            0 :         ereport(ERROR,
    1771              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1772              :                  errmsg("database \"%s\" is being used by logical replication subscription",
    1773              :                         dbname),
    1774              :                  errdetail_plural("There is %d subscription.",
    1775              :                                   "There are %d subscriptions.",
    1776              :                                   nsubscriptions, nsubscriptions)));
    1777              : 
    1778              : 
    1779              :     /*
    1780              :      * Attempt to terminate all existing connections to the target database if
    1781              :      * the user has requested to do so.
    1782              :      */
    1783           47 :     if (force)
    1784            1 :         TerminateOtherDBBackends(db_id);
    1785              : 
    1786              :     /*
    1787              :      * Check for other backends in the target database.  (Because we hold the
    1788              :      * database lock, no new ones can start after this.)
    1789              :      *
    1790              :      * As in CREATE DATABASE, check this after other error conditions.
    1791              :      */
    1792           47 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1793            0 :         ereport(ERROR,
    1794              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1795              :                  errmsg("database \"%s\" is being accessed by other users",
    1796              :                         dbname),
    1797              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1798              : 
    1799              :     /*
    1800              :      * Delete any comments or security labels associated with the database.
    1801              :      */
    1802           47 :     DeleteSharedComments(db_id, DatabaseRelationId);
    1803           47 :     DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
    1804              : 
    1805              :     /*
    1806              :      * Remove settings associated with this database
    1807              :      */
    1808           47 :     DropSetting(db_id, InvalidOid);
    1809              : 
    1810              :     /*
    1811              :      * Remove shared dependency references for the database.
    1812              :      */
    1813           47 :     dropDatabaseDependencies(db_id);
    1814              : 
    1815              :     /*
    1816              :      * Tell the cumulative stats system to forget it immediately, too.
    1817              :      */
    1818           47 :     pgstat_drop_database(db_id);
    1819              : 
    1820              :     /*
    1821              :      * Except for the deletion of the catalog row, subsequent actions are not
    1822              :      * transactional (consider DropDatabaseBuffers() discarding modified
    1823              :      * buffers). But we might crash or get interrupted below. To prevent
    1824              :      * accesses to a database with invalid contents, mark the database as
    1825              :      * invalid using an in-place update.
    1826              :      *
    1827              :      * We need to flush the WAL before continuing, to guarantee the
    1828              :      * modification is durable before performing irreversible filesystem
    1829              :      * operations.
    1830              :      */
    1831           47 :     ScanKeyInit(&scankey,
    1832              :                 Anum_pg_database_datname,
    1833              :                 BTEqualStrategyNumber, F_NAMEEQ,
    1834              :                 CStringGetDatum(dbname));
    1835           47 :     systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
    1836              :                                   NULL, 1, &scankey, &tup, &inplace_state);
    1837           47 :     if (!HeapTupleIsValid(tup))
    1838            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1839           47 :     datform = (Form_pg_database) GETSTRUCT(tup);
    1840           47 :     datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
    1841           47 :     systable_inplace_update_finish(inplace_state, tup);
    1842           47 :     XLogFlush(XactLastRecEnd);
    1843              : 
    1844              :     /*
    1845              :      * Also delete the tuple - transactionally. If this transaction commits,
    1846              :      * the row will be gone, but if we fail, dropdb() can be invoked again.
    1847              :      */
    1848           47 :     CatalogTupleDelete(pgdbrel, &tup->t_self);
    1849           47 :     heap_freetuple(tup);
    1850              : 
    1851              :     /*
    1852              :      * Drop db-specific replication slots.
    1853              :      */
    1854           47 :     ReplicationSlotsDropDBSlots(db_id);
    1855              : 
    1856              :     /*
    1857              :      * Drop pages for this database that are in the shared buffer cache. This
    1858              :      * is important to ensure that no remaining backend tries to write out a
    1859              :      * dirty buffer to the dead database later...
    1860              :      */
    1861           47 :     DropDatabaseBuffers(db_id);
    1862              : 
    1863              :     /*
    1864              :      * Tell checkpointer to forget any pending fsync and unlink requests for
    1865              :      * files in the database; else the fsyncs will fail at next checkpoint, or
    1866              :      * worse, it will delete files that belong to a newly created database
    1867              :      * with the same OID.
    1868              :      */
    1869           47 :     ForgetDatabaseSyncRequests(db_id);
    1870              : 
    1871              :     /*
    1872              :      * Force a checkpoint to make sure the checkpointer has received the
    1873              :      * message sent by ForgetDatabaseSyncRequests.
    1874              :      */
    1875           47 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    1876              : 
    1877              :     /* Close all smgr fds in all backends. */
    1878           47 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    1879              : 
    1880              :     /*
    1881              :      * Remove all tablespace subdirs belonging to the database.
    1882              :      */
    1883           47 :     remove_dbtablespaces(db_id);
    1884              : 
    1885              :     /*
    1886              :      * Close pg_database, but keep lock till commit.
    1887              :      */
    1888           47 :     table_close(pgdbrel, NoLock);
    1889              : 
    1890              :     /*
    1891              :      * Force synchronous commit, thus minimizing the window between removal of
    1892              :      * the database files and committal of the transaction. If we crash before
    1893              :      * committing, we'll have a DB that's gone on disk but still there
    1894              :      * according to pg_database, which is not good.
    1895              :      */
    1896           47 :     ForceSyncCommit();
    1897              : }
    1898              : 
    1899              : 
    1900              : /*
    1901              :  * Rename database
    1902              :  */
    1903              : ObjectAddress
    1904            7 : RenameDatabase(const char *oldname, const char *newname)
    1905              : {
    1906              :     Oid         db_id;
    1907              :     HeapTuple   newtup;
    1908              :     ItemPointerData otid;
    1909              :     Relation    rel;
    1910              :     int         notherbackends;
    1911              :     int         npreparedxacts;
    1912              :     ObjectAddress address;
    1913              : 
    1914              :     /*
    1915              :      * Look up the target database's OID, and get exclusive lock on it. We
    1916              :      * need this for the same reasons as DROP DATABASE.
    1917              :      */
    1918            7 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1919              : 
    1920            7 :     if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    1921              :                      NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1922            0 :         ereport(ERROR,
    1923              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1924              :                  errmsg("database \"%s\" does not exist", oldname)));
    1925              : 
    1926              :     /* must be owner */
    1927            7 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1928            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1929              :                        oldname);
    1930              : 
    1931              :     /* must have createdb rights */
    1932            7 :     if (!have_createdb_privilege())
    1933            0 :         ereport(ERROR,
    1934              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1935              :                  errmsg("permission denied to rename database")));
    1936              : 
    1937              :     /*
    1938              :      * If built with appropriate switch, whine when regression-testing
    1939              :      * conventions for database names are violated.
    1940              :      */
    1941              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1942              :     if (strstr(newname, "regression") == NULL)
    1943              :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1944              : #endif
    1945              : 
    1946              :     /*
    1947              :      * Make sure the new name doesn't exist.  See notes for same error in
    1948              :      * CREATE DATABASE.
    1949              :      */
    1950            7 :     if (OidIsValid(get_database_oid(newname, true)))
    1951            0 :         ereport(ERROR,
    1952              :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1953              :                  errmsg("database \"%s\" already exists", newname)));
    1954              : 
    1955              :     /*
    1956              :      * XXX Client applications probably store the current database somewhere,
    1957              :      * so renaming it could cause confusion.  On the other hand, there may not
    1958              :      * be an actual problem besides a little confusion, so think about this
    1959              :      * and decide.
    1960              :      */
    1961            7 :     if (db_id == MyDatabaseId)
    1962            0 :         ereport(ERROR,
    1963              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1964              :                  errmsg("current database cannot be renamed")));
    1965              : 
    1966              :     /*
    1967              :      * Make sure the database does not have active sessions.  This is the same
    1968              :      * concern as above, but applied to other sessions.
    1969              :      *
    1970              :      * As in CREATE DATABASE, check this after other error conditions.
    1971              :      */
    1972            7 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1973            0 :         ereport(ERROR,
    1974              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1975              :                  errmsg("database \"%s\" is being accessed by other users",
    1976              :                         oldname),
    1977              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1978              : 
    1979              :     /* rename */
    1980            7 :     newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    1981            7 :     if (!HeapTupleIsValid(newtup))
    1982            0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1983            7 :     otid = newtup->t_self;
    1984            7 :     namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
    1985            7 :     CatalogTupleUpdate(rel, &otid, newtup);
    1986            7 :     UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
    1987              : 
    1988            7 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    1989              : 
    1990            7 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    1991              : 
    1992              :     /*
    1993              :      * Close pg_database, but keep lock till commit.
    1994              :      */
    1995            7 :     table_close(rel, NoLock);
    1996              : 
    1997            7 :     return address;
    1998              : }
    1999              : 
    2000              : 
    2001              : /*
    2002              :  * ALTER DATABASE SET TABLESPACE
    2003              :  */
    2004              : static void
    2005           12 : movedb(const char *dbname, const char *tblspcname)
    2006              : {
    2007              :     Oid         db_id;
    2008              :     Relation    pgdbrel;
    2009              :     int         notherbackends;
    2010              :     int         npreparedxacts;
    2011              :     HeapTuple   oldtuple,
    2012              :                 newtuple;
    2013              :     Oid         src_tblspcoid,
    2014              :                 dst_tblspcoid;
    2015              :     ScanKeyData scankey;
    2016              :     SysScanDesc sysscan;
    2017              :     AclResult   aclresult;
    2018              :     char       *src_dbpath;
    2019              :     char       *dst_dbpath;
    2020              :     DIR        *dstdir;
    2021              :     struct dirent *xlde;
    2022              :     movedb_failure_params fparms;
    2023              : 
    2024              :     /*
    2025              :      * Look up the target database's OID, and get exclusive lock on it. We
    2026              :      * need this to ensure that no new backend starts up in the database while
    2027              :      * we are moving it, and that no one is using it as a CREATE DATABASE
    2028              :      * template or trying to delete it.
    2029              :      */
    2030           12 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    2031              : 
    2032           12 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
    2033              :                      NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
    2034            0 :         ereport(ERROR,
    2035              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2036              :                  errmsg("database \"%s\" does not exist", dbname)));
    2037              : 
    2038              :     /*
    2039              :      * We actually need a session lock, so that the lock will persist across
    2040              :      * the commit/restart below.  (We could almost get away with letting the
    2041              :      * lock be released at commit, except that someone could try to move
    2042              :      * relations of the DB back into the old directory while we rmtree() it.)
    2043              :      */
    2044           12 :     LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2045              :                                AccessExclusiveLock);
    2046              : 
    2047              :     /*
    2048              :      * Permission checks
    2049              :      */
    2050           12 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2051            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2052              :                        dbname);
    2053              : 
    2054              :     /*
    2055              :      * Obviously can't move the tables of my own database
    2056              :      */
    2057           12 :     if (db_id == MyDatabaseId)
    2058            0 :         ereport(ERROR,
    2059              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2060              :                  errmsg("cannot change the tablespace of the currently open database")));
    2061              : 
    2062              :     /*
    2063              :      * Get tablespace's oid
    2064              :      */
    2065           12 :     dst_tblspcoid = get_tablespace_oid(tblspcname, false);
    2066              : 
    2067              :     /*
    2068              :      * Permission checks
    2069              :      */
    2070           12 :     aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
    2071              :                                 ACL_CREATE);
    2072           12 :     if (aclresult != ACLCHECK_OK)
    2073            0 :         aclcheck_error(aclresult, OBJECT_TABLESPACE,
    2074              :                        tblspcname);
    2075              : 
    2076              :     /*
    2077              :      * pg_global must never be the default tablespace
    2078              :      */
    2079           12 :     if (dst_tblspcoid == GLOBALTABLESPACE_OID)
    2080            0 :         ereport(ERROR,
    2081              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2082              :                  errmsg("pg_global cannot be used as default tablespace")));
    2083              : 
    2084              :     /*
    2085              :      * No-op if same tablespace
    2086              :      */
    2087           12 :     if (src_tblspcoid == dst_tblspcoid)
    2088              :     {
    2089            0 :         table_close(pgdbrel, NoLock);
    2090            0 :         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2091              :                                      AccessExclusiveLock);
    2092            0 :         return;
    2093              :     }
    2094              : 
    2095              :     /*
    2096              :      * Check for other backends in the target database.  (Because we hold the
    2097              :      * database lock, no new ones can start after this.)
    2098              :      *
    2099              :      * As in CREATE DATABASE, check this after other error conditions.
    2100              :      */
    2101           12 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    2102            0 :         ereport(ERROR,
    2103              :                 (errcode(ERRCODE_OBJECT_IN_USE),
    2104              :                  errmsg("database \"%s\" is being accessed by other users",
    2105              :                         dbname),
    2106              :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    2107              : 
    2108              :     /*
    2109              :      * Get old and new database paths
    2110              :      */
    2111           12 :     src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
    2112           12 :     dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
    2113              : 
    2114              :     /*
    2115              :      * Force a checkpoint before proceeding. This will force all dirty
    2116              :      * buffers, including those of unlogged tables, out to disk, to ensure
    2117              :      * source database is up-to-date on disk for the copy.
    2118              :      * FlushDatabaseBuffers() would suffice for that, but we also want to
    2119              :      * process any pending unlink requests. Otherwise, the check for existing
    2120              :      * files in the target directory might fail unnecessarily, not to mention
    2121              :      * that the copy might fail due to source files getting deleted under it.
    2122              :      * On Windows, this also ensures that background procs don't hold any open
    2123              :      * files, which would cause rmdir() to fail.
    2124              :      */
    2125           12 :     RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
    2126              :                       | CHECKPOINT_FLUSH_UNLOGGED);
    2127              : 
    2128              :     /* Close all smgr fds in all backends. */
    2129           12 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    2130              : 
    2131              :     /*
    2132              :      * Now drop all buffers holding data of the target database; they should
    2133              :      * no longer be dirty so DropDatabaseBuffers is safe.
    2134              :      *
    2135              :      * It might seem that we could just let these buffers age out of shared
    2136              :      * buffers naturally, since they should not get referenced anymore.  The
    2137              :      * problem with that is that if the user later moves the database back to
    2138              :      * its original tablespace, any still-surviving buffers would appear to
    2139              :      * contain valid data again --- but they'd be missing any changes made in
    2140              :      * the database while it was in the new tablespace.  In any case, freeing
    2141              :      * buffers that should never be used again seems worth the cycles.
    2142              :      *
    2143              :      * Note: it'd be sufficient to get rid of buffers matching db_id and
    2144              :      * src_tblspcoid, but bufmgr.c presently provides no API for that.
    2145              :      */
    2146           12 :     DropDatabaseBuffers(db_id);
    2147              : 
    2148              :     /*
    2149              :      * Check for existence of files in the target directory, i.e., objects of
    2150              :      * this database that are already in the target tablespace.  We can't
    2151              :      * allow the move in such a case, because we would need to change those
    2152              :      * relations' pg_class.reltablespace entries to zero, and we don't have
    2153              :      * access to the DB's pg_class to do so.
    2154              :      */
    2155           12 :     dstdir = AllocateDir(dst_dbpath);
    2156           12 :     if (dstdir != NULL)
    2157              :     {
    2158            0 :         while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
    2159              :         {
    2160            0 :             if (strcmp(xlde->d_name, ".") == 0 ||
    2161            0 :                 strcmp(xlde->d_name, "..") == 0)
    2162            0 :                 continue;
    2163              : 
    2164            0 :             ereport(ERROR,
    2165              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2166              :                      errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
    2167              :                             dbname, tblspcname),
    2168              :                      errhint("You must move them back to the database's default tablespace before using this command.")));
    2169              :         }
    2170              : 
    2171            0 :         FreeDir(dstdir);
    2172              : 
    2173              :         /*
    2174              :          * The directory exists but is empty. We must remove it before using
    2175              :          * the copydir function.
    2176              :          */
    2177            0 :         if (rmdir(dst_dbpath) != 0)
    2178            0 :             elog(ERROR, "could not remove directory \"%s\": %m",
    2179              :                  dst_dbpath);
    2180              :     }
    2181              : 
    2182              :     /*
    2183              :      * Use an ENSURE block to make sure we remove the debris if the copy fails
    2184              :      * (eg, due to out-of-disk-space).  This is not a 100% solution, because
    2185              :      * of the possibility of failure during transaction commit, but it should
    2186              :      * handle most scenarios.
    2187              :      */
    2188           12 :     fparms.dest_dboid = db_id;
    2189           12 :     fparms.dest_tsoid = dst_tblspcoid;
    2190           12 :     PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2191              :                             PointerGetDatum(&fparms));
    2192              :     {
    2193           12 :         Datum       new_record[Natts_pg_database] = {0};
    2194           12 :         bool        new_record_nulls[Natts_pg_database] = {0};
    2195           12 :         bool        new_record_repl[Natts_pg_database] = {0};
    2196              : 
    2197              :         /*
    2198              :          * Copy files from the old tablespace to the new one
    2199              :          */
    2200           12 :         copydir(src_dbpath, dst_dbpath, false);
    2201              : 
    2202              :         /*
    2203              :          * Record the filesystem change in XLOG
    2204              :          */
    2205              :         {
    2206              :             xl_dbase_create_file_copy_rec xlrec;
    2207              : 
    2208           12 :             xlrec.db_id = db_id;
    2209           12 :             xlrec.tablespace_id = dst_tblspcoid;
    2210           12 :             xlrec.src_db_id = db_id;
    2211           12 :             xlrec.src_tablespace_id = src_tblspcoid;
    2212              : 
    2213           12 :             XLogBeginInsert();
    2214           12 :             XLogRegisterData(&xlrec,
    2215              :                              sizeof(xl_dbase_create_file_copy_rec));
    2216              : 
    2217           12 :             (void) XLogInsert(RM_DBASE_ID,
    2218              :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
    2219              :         }
    2220              : 
    2221              :         /*
    2222              :          * Update the database's pg_database tuple
    2223              :          */
    2224           12 :         ScanKeyInit(&scankey,
    2225              :                     Anum_pg_database_datname,
    2226              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2227              :                     CStringGetDatum(dbname));
    2228           12 :         sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
    2229              :                                      NULL, 1, &scankey);
    2230           12 :         oldtuple = systable_getnext(sysscan);
    2231           12 :         if (!HeapTupleIsValid(oldtuple))    /* shouldn't happen... */
    2232            0 :             ereport(ERROR,
    2233              :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    2234              :                      errmsg("database \"%s\" does not exist", dbname)));
    2235           12 :         LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2236              : 
    2237           12 :         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
    2238           12 :         new_record_repl[Anum_pg_database_dattablespace - 1] = true;
    2239              : 
    2240           12 :         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
    2241              :                                      new_record,
    2242              :                                      new_record_nulls, new_record_repl);
    2243           12 :         CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
    2244           12 :         UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
    2245              : 
    2246           12 :         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2247              : 
    2248           12 :         systable_endscan(sysscan);
    2249              : 
    2250              :         /*
    2251              :          * Force another checkpoint here.  As in CREATE DATABASE, this is to
    2252              :          * ensure that we don't have to replay a committed
    2253              :          * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
    2254              :          * any unlogged operations done in the new DB tablespace before the
    2255              :          * next checkpoint.
    2256              :          */
    2257           12 :         RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    2258              : 
    2259              :         /*
    2260              :          * Force synchronous commit, thus minimizing the window between
    2261              :          * copying the database files and committal of the transaction. If we
    2262              :          * crash before committing, we'll leave an orphaned set of files on
    2263              :          * disk, which is not fatal but not good either.
    2264              :          */
    2265           12 :         ForceSyncCommit();
    2266              : 
    2267              :         /*
    2268              :          * Close pg_database, but keep lock till commit.
    2269              :          */
    2270           12 :         table_close(pgdbrel, NoLock);
    2271              :     }
    2272           12 :     PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2273              :                                 PointerGetDatum(&fparms));
    2274              : 
    2275              :     /*
    2276              :      * Commit the transaction so that the pg_database update is committed. If
    2277              :      * we crash while removing files, the database won't be corrupt, we'll
    2278              :      * just leave some orphaned files in the old directory.
    2279              :      *
    2280              :      * (This is OK because we know we aren't inside a transaction block.)
    2281              :      *
    2282              :      * XXX would it be safe/better to do this inside the ensure block?  Not
    2283              :      * convinced it's a good idea; consider elog just after the transaction
    2284              :      * really commits.
    2285              :      */
    2286           12 :     PopActiveSnapshot();
    2287           12 :     CommitTransactionCommand();
    2288              : 
    2289              :     /* Start new transaction for the remaining work; don't need a snapshot */
    2290           12 :     StartTransactionCommand();
    2291              : 
    2292              :     /*
    2293              :      * Remove files from the old tablespace
    2294              :      */
    2295           12 :     if (!rmtree(src_dbpath, true))
    2296            0 :         ereport(WARNING,
    2297              :                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2298              :                         src_dbpath)));
    2299              : 
    2300              :     /*
    2301              :      * Record the filesystem change in XLOG
    2302              :      */
    2303              :     {
    2304              :         xl_dbase_drop_rec xlrec;
    2305              : 
    2306           12 :         xlrec.db_id = db_id;
    2307           12 :         xlrec.ntablespaces = 1;
    2308              : 
    2309           12 :         XLogBeginInsert();
    2310           12 :         XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
    2311           12 :         XLogRegisterData(&src_tblspcoid, sizeof(Oid));
    2312              : 
    2313           12 :         (void) XLogInsert(RM_DBASE_ID,
    2314              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2315              :     }
    2316              : 
    2317              :     /* Now it's safe to release the database lock */
    2318           12 :     UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2319              :                                  AccessExclusiveLock);
    2320              : 
    2321           12 :     pfree(src_dbpath);
    2322           12 :     pfree(dst_dbpath);
    2323              : }
    2324              : 
    2325              : /* Error cleanup callback for movedb */
    2326              : static void
    2327            0 : movedb_failure_callback(int code, Datum arg)
    2328              : {
    2329            0 :     movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
    2330              :     char       *dstpath;
    2331              : 
    2332              :     /* Get rid of anything we managed to copy to the target directory */
    2333            0 :     dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
    2334              : 
    2335            0 :     (void) rmtree(dstpath, true);
    2336              : 
    2337            0 :     pfree(dstpath);
    2338            0 : }
    2339              : 
    2340              : /*
    2341              :  * Process options and call dropdb function.
    2342              :  */
    2343              : void
    2344           64 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
    2345              : {
    2346           64 :     bool        force = false;
    2347              :     ListCell   *lc;
    2348              : 
    2349           77 :     foreach(lc, stmt->options)
    2350              :     {
    2351           13 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2352              : 
    2353           13 :         if (strcmp(opt->defname, "force") == 0)
    2354           13 :             force = true;
    2355              :         else
    2356            0 :             ereport(ERROR,
    2357              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2358              :                      errmsg("unrecognized %s option \"%s\"",
    2359              :                             "DROP DATABASE", opt->defname),
    2360              :                      parser_errposition(pstate, opt->location)));
    2361              :     }
    2362              : 
    2363           64 :     dropdb(stmt->dbname, stmt->missing_ok, force);
    2364           55 : }
    2365              : 
    2366              : /*
    2367              :  * ALTER DATABASE name ...
    2368              :  */
    2369              : Oid
    2370           46 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    2371              : {
    2372              :     Relation    rel;
    2373              :     Oid         dboid;
    2374              :     HeapTuple   tuple,
    2375              :                 newtuple;
    2376              :     Form_pg_database datform;
    2377              :     ScanKeyData scankey;
    2378              :     SysScanDesc scan;
    2379              :     ListCell   *option;
    2380           46 :     bool        dbistemplate = false;
    2381           46 :     bool        dballowconnections = true;
    2382           46 :     int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
    2383           46 :     DefElem    *distemplate = NULL;
    2384           46 :     DefElem    *dallowconnections = NULL;
    2385           46 :     DefElem    *dconnlimit = NULL;
    2386           46 :     DefElem    *dtablespace = NULL;
    2387           46 :     Datum       new_record[Natts_pg_database] = {0};
    2388           46 :     bool        new_record_nulls[Natts_pg_database] = {0};
    2389           46 :     bool        new_record_repl[Natts_pg_database] = {0};
    2390              : 
    2391              :     /* Extract options from the statement node tree */
    2392           92 :     foreach(option, stmt->options)
    2393              :     {
    2394           46 :         DefElem    *defel = (DefElem *) lfirst(option);
    2395              : 
    2396           46 :         if (strcmp(defel->defname, "is_template") == 0)
    2397              :         {
    2398           11 :             if (distemplate)
    2399            0 :                 errorConflictingDefElem(defel, pstate);
    2400           11 :             distemplate = defel;
    2401              :         }
    2402           35 :         else if (strcmp(defel->defname, "allow_connections") == 0)
    2403              :         {
    2404           19 :             if (dallowconnections)
    2405            0 :                 errorConflictingDefElem(defel, pstate);
    2406           19 :             dallowconnections = defel;
    2407              :         }
    2408           16 :         else if (strcmp(defel->defname, "connection_limit") == 0)
    2409              :         {
    2410            4 :             if (dconnlimit)
    2411            0 :                 errorConflictingDefElem(defel, pstate);
    2412            4 :             dconnlimit = defel;
    2413              :         }
    2414           12 :         else if (strcmp(defel->defname, "tablespace") == 0)
    2415              :         {
    2416           12 :             if (dtablespace)
    2417            0 :                 errorConflictingDefElem(defel, pstate);
    2418           12 :             dtablespace = defel;
    2419              :         }
    2420              :         else
    2421            0 :             ereport(ERROR,
    2422              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2423              :                      errmsg("option \"%s\" not recognized", defel->defname),
    2424              :                      parser_errposition(pstate, defel->location)));
    2425              :     }
    2426              : 
    2427           46 :     if (dtablespace)
    2428              :     {
    2429              :         /*
    2430              :          * While the SET TABLESPACE syntax doesn't allow any other options,
    2431              :          * somebody could write "WITH TABLESPACE ...".  Forbid any other
    2432              :          * options from being specified in that case.
    2433              :          */
    2434           12 :         if (list_length(stmt->options) != 1)
    2435            0 :             ereport(ERROR,
    2436              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2437              :                      errmsg("option \"%s\" cannot be specified with other options",
    2438              :                             dtablespace->defname),
    2439              :                      parser_errposition(pstate, dtablespace->location)));
    2440              :         /* this case isn't allowed within a transaction block */
    2441           12 :         PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
    2442           12 :         movedb(stmt->dbname, defGetString(dtablespace));
    2443           12 :         return InvalidOid;
    2444              :     }
    2445              : 
    2446           34 :     if (distemplate && distemplate->arg)
    2447           11 :         dbistemplate = defGetBoolean(distemplate);
    2448           34 :     if (dallowconnections && dallowconnections->arg)
    2449           19 :         dballowconnections = defGetBoolean(dallowconnections);
    2450           34 :     if (dconnlimit && dconnlimit->arg)
    2451              :     {
    2452            4 :         dbconnlimit = defGetInt32(dconnlimit);
    2453            4 :         if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
    2454            0 :             ereport(ERROR,
    2455              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2456              :                      errmsg("invalid connection limit: %d", dbconnlimit)));
    2457              :     }
    2458              : 
    2459              :     /*
    2460              :      * Get the old tuple.  We don't need a lock on the database per se,
    2461              :      * because we're not going to do anything that would mess up incoming
    2462              :      * connections.
    2463              :      */
    2464           34 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2465           34 :     ScanKeyInit(&scankey,
    2466              :                 Anum_pg_database_datname,
    2467              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2468           34 :                 CStringGetDatum(stmt->dbname));
    2469           34 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2470              :                               NULL, 1, &scankey);
    2471           34 :     tuple = systable_getnext(scan);
    2472           34 :     if (!HeapTupleIsValid(tuple))
    2473            0 :         ereport(ERROR,
    2474              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2475              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2476           34 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2477              : 
    2478           34 :     datform = (Form_pg_database) GETSTRUCT(tuple);
    2479           34 :     dboid = datform->oid;
    2480              : 
    2481           34 :     if (database_is_invalid_form(datform))
    2482              :     {
    2483            1 :         ereport(FATAL,
    2484              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2485              :                 errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
    2486              :                 errhint("Use DROP DATABASE to drop invalid databases."));
    2487              :     }
    2488              : 
    2489           33 :     if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
    2490            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2491            0 :                        stmt->dbname);
    2492              : 
    2493              :     /*
    2494              :      * In order to avoid getting locked out and having to go through
    2495              :      * standalone mode, we refuse to disallow connections to the database
    2496              :      * we're currently connected to.  Lockout can still happen with concurrent
    2497              :      * sessions but the likeliness of that is not high enough to worry about.
    2498              :      */
    2499           33 :     if (!dballowconnections && dboid == MyDatabaseId)
    2500            0 :         ereport(ERROR,
    2501              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2502              :                  errmsg("cannot disallow connections for current database")));
    2503              : 
    2504              :     /*
    2505              :      * Build an updated tuple, perusing the information just obtained
    2506              :      */
    2507           33 :     if (distemplate)
    2508              :     {
    2509           11 :         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    2510           11 :         new_record_repl[Anum_pg_database_datistemplate - 1] = true;
    2511              :     }
    2512           33 :     if (dallowconnections)
    2513              :     {
    2514           19 :         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    2515           19 :         new_record_repl[Anum_pg_database_datallowconn - 1] = true;
    2516              :     }
    2517           33 :     if (dconnlimit)
    2518              :     {
    2519            3 :         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    2520            3 :         new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    2521              :     }
    2522              : 
    2523           33 :     newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
    2524              :                                  new_record_nulls, new_record_repl);
    2525           33 :     CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2526           33 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2527              : 
    2528           33 :     InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
    2529              : 
    2530           33 :     systable_endscan(scan);
    2531              : 
    2532              :     /* Close pg_database, but keep lock till commit */
    2533           33 :     table_close(rel, NoLock);
    2534              : 
    2535           33 :     return dboid;
    2536              : }
    2537              : 
    2538              : 
    2539              : /*
    2540              :  * ALTER DATABASE name REFRESH COLLATION VERSION
    2541              :  */
    2542              : ObjectAddress
    2543            3 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    2544              : {
    2545              :     Relation    rel;
    2546              :     ScanKeyData scankey;
    2547              :     SysScanDesc scan;
    2548              :     Oid         db_id;
    2549              :     HeapTuple   tuple;
    2550              :     Form_pg_database datForm;
    2551              :     ObjectAddress address;
    2552              :     Datum       datum;
    2553              :     bool        isnull;
    2554              :     char       *oldversion;
    2555              :     char       *newversion;
    2556              : 
    2557            3 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2558            3 :     ScanKeyInit(&scankey,
    2559              :                 Anum_pg_database_datname,
    2560              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2561            3 :                 CStringGetDatum(stmt->dbname));
    2562            3 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2563              :                               NULL, 1, &scankey);
    2564            3 :     tuple = systable_getnext(scan);
    2565            3 :     if (!HeapTupleIsValid(tuple))
    2566            0 :         ereport(ERROR,
    2567              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2568              :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2569              : 
    2570            3 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2571            3 :     db_id = datForm->oid;
    2572              : 
    2573            3 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2574            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2575            0 :                        stmt->dbname);
    2576            3 :     LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2577              : 
    2578            3 :     datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    2579            3 :     oldversion = isnull ? NULL : TextDatumGetCString(datum);
    2580              : 
    2581            3 :     if (datForm->datlocprovider == COLLPROVIDER_LIBC)
    2582              :     {
    2583            2 :         datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
    2584            2 :         if (isnull)
    2585            0 :             elog(ERROR, "unexpected null in pg_database");
    2586              :     }
    2587              :     else
    2588              :     {
    2589            1 :         datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
    2590            1 :         if (isnull)
    2591            0 :             elog(ERROR, "unexpected null in pg_database");
    2592              :     }
    2593              : 
    2594            3 :     newversion = get_collation_actual_version(datForm->datlocprovider,
    2595            3 :                                               TextDatumGetCString(datum));
    2596              : 
    2597              :     /* cannot change from NULL to non-NULL or vice versa */
    2598            3 :     if ((!oldversion && newversion) || (oldversion && !newversion))
    2599            0 :         elog(ERROR, "invalid collation version change");
    2600            3 :     else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
    2601            0 :     {
    2602            0 :         bool        nulls[Natts_pg_database] = {0};
    2603            0 :         bool        replaces[Natts_pg_database] = {0};
    2604            0 :         Datum       values[Natts_pg_database] = {0};
    2605              :         HeapTuple   newtuple;
    2606              : 
    2607            0 :         ereport(NOTICE,
    2608              :                 (errmsg("changing version from %s to %s",
    2609              :                         oldversion, newversion)));
    2610              : 
    2611            0 :         values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
    2612            0 :         replaces[Anum_pg_database_datcollversion - 1] = true;
    2613              : 
    2614            0 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
    2615              :                                      values, nulls, replaces);
    2616            0 :         CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2617            0 :         heap_freetuple(newtuple);
    2618              :     }
    2619              :     else
    2620            3 :         ereport(NOTICE,
    2621              :                 (errmsg("version has not changed")));
    2622            3 :     UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2623              : 
    2624            3 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2625              : 
    2626            3 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2627              : 
    2628            3 :     systable_endscan(scan);
    2629              : 
    2630            3 :     table_close(rel, NoLock);
    2631              : 
    2632            3 :     return address;
    2633              : }
    2634              : 
    2635              : 
    2636              : /*
    2637              :  * ALTER DATABASE name SET ...
    2638              :  */
    2639              : Oid
    2640          633 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
    2641              : {
    2642          633 :     Oid         datid = get_database_oid(stmt->dbname, false);
    2643              : 
    2644              :     /*
    2645              :      * Obtain a lock on the database and make sure it didn't go away in the
    2646              :      * meantime.
    2647              :      */
    2648          633 :     shdepLockAndCheckObject(DatabaseRelationId, datid);
    2649              : 
    2650          633 :     if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
    2651            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2652            0 :                        stmt->dbname);
    2653              : 
    2654          633 :     AlterSetting(datid, InvalidOid, stmt->setstmt);
    2655              : 
    2656          631 :     UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
    2657              : 
    2658          631 :     return datid;
    2659              : }
    2660              : 
    2661              : 
    2662              : /*
    2663              :  * ALTER DATABASE name OWNER TO newowner
    2664              :  */
    2665              : ObjectAddress
    2666           45 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
    2667              : {
    2668              :     Oid         db_id;
    2669              :     HeapTuple   tuple;
    2670              :     Relation    rel;
    2671              :     ScanKeyData scankey;
    2672              :     SysScanDesc scan;
    2673              :     Form_pg_database datForm;
    2674              :     ObjectAddress address;
    2675              : 
    2676              :     /*
    2677              :      * Get the old tuple.  We don't need a lock on the database per se,
    2678              :      * because we're not going to do anything that would mess up incoming
    2679              :      * connections.
    2680              :      */
    2681           45 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2682           45 :     ScanKeyInit(&scankey,
    2683              :                 Anum_pg_database_datname,
    2684              :                 BTEqualStrategyNumber, F_NAMEEQ,
    2685              :                 CStringGetDatum(dbname));
    2686           45 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2687              :                               NULL, 1, &scankey);
    2688           45 :     tuple = systable_getnext(scan);
    2689           45 :     if (!HeapTupleIsValid(tuple))
    2690            0 :         ereport(ERROR,
    2691              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2692              :                  errmsg("database \"%s\" does not exist", dbname)));
    2693              : 
    2694           45 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2695           45 :     db_id = datForm->oid;
    2696              : 
    2697              :     /*
    2698              :      * If the new owner is the same as the existing owner, consider the
    2699              :      * command to have succeeded.  This is to be consistent with other
    2700              :      * objects.
    2701              :      */
    2702           45 :     if (datForm->datdba != newOwnerId)
    2703              :     {
    2704              :         Datum       repl_val[Natts_pg_database];
    2705           15 :         bool        repl_null[Natts_pg_database] = {0};
    2706           15 :         bool        repl_repl[Natts_pg_database] = {0};
    2707              :         Acl        *newAcl;
    2708              :         Datum       aclDatum;
    2709              :         bool        isNull;
    2710              :         HeapTuple   newtuple;
    2711              : 
    2712              :         /* Otherwise, must be owner of the existing object */
    2713           15 :         if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2714            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2715              :                            dbname);
    2716              : 
    2717              :         /* Must be able to become new owner */
    2718           15 :         check_can_set_role(GetUserId(), newOwnerId);
    2719              : 
    2720              :         /*
    2721              :          * must have createdb rights
    2722              :          *
    2723              :          * NOTE: This is different from other alter-owner checks in that the
    2724              :          * current user is checked for createdb privileges instead of the
    2725              :          * destination owner.  This is consistent with the CREATE case for
    2726              :          * databases.  Because superusers will always have this right, we need
    2727              :          * no special case for them.
    2728              :          */
    2729           15 :         if (!have_createdb_privilege())
    2730            0 :             ereport(ERROR,
    2731              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2732              :                      errmsg("permission denied to change owner of database")));
    2733              : 
    2734           15 :         LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2735              : 
    2736           15 :         repl_repl[Anum_pg_database_datdba - 1] = true;
    2737           15 :         repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
    2738              : 
    2739              :         /*
    2740              :          * Determine the modified ACL for the new owner.  This is only
    2741              :          * necessary when the ACL is non-null.
    2742              :          */
    2743           15 :         aclDatum = heap_getattr(tuple,
    2744              :                                 Anum_pg_database_datacl,
    2745              :                                 RelationGetDescr(rel),
    2746              :                                 &isNull);
    2747           15 :         if (!isNull)
    2748              :         {
    2749            0 :             newAcl = aclnewowner(DatumGetAclP(aclDatum),
    2750              :                                  datForm->datdba, newOwnerId);
    2751            0 :             repl_repl[Anum_pg_database_datacl - 1] = true;
    2752            0 :             repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
    2753              :         }
    2754              : 
    2755           15 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
    2756           15 :         CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
    2757           15 :         UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
    2758              : 
    2759           15 :         heap_freetuple(newtuple);
    2760              : 
    2761              :         /* Update owner dependency reference */
    2762           15 :         changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
    2763              :     }
    2764              : 
    2765           45 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2766              : 
    2767           45 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2768              : 
    2769           45 :     systable_endscan(scan);
    2770              : 
    2771              :     /* Close pg_database, but keep lock till commit */
    2772           45 :     table_close(rel, NoLock);
    2773              : 
    2774           45 :     return address;
    2775              : }
    2776              : 
    2777              : 
    2778              : Datum
    2779           49 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
    2780              : {
    2781           49 :     Oid         dbid = PG_GETARG_OID(0);
    2782              :     HeapTuple   tp;
    2783              :     char        datlocprovider;
    2784              :     Datum       datum;
    2785              :     char       *version;
    2786              : 
    2787           49 :     tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    2788           49 :     if (!HeapTupleIsValid(tp))
    2789            0 :         ereport(ERROR,
    2790              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2791              :                  errmsg("database with OID %u does not exist", dbid)));
    2792              : 
    2793           49 :     datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
    2794              : 
    2795           49 :     if (datlocprovider == COLLPROVIDER_LIBC)
    2796           42 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
    2797              :     else
    2798            7 :         datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
    2799              : 
    2800           49 :     version = get_collation_actual_version(datlocprovider,
    2801           49 :                                            TextDatumGetCString(datum));
    2802              : 
    2803           49 :     ReleaseSysCache(tp);
    2804              : 
    2805           49 :     if (version)
    2806           35 :         PG_RETURN_TEXT_P(cstring_to_text(version));
    2807              :     else
    2808           14 :         PG_RETURN_NULL();
    2809              : }
    2810              : 
    2811              : 
    2812              : /*
    2813              :  * Helper functions
    2814              :  */
    2815              : 
    2816              : /*
    2817              :  * Look up info about the database named "name".  If the database exists,
    2818              :  * obtain the specified lock type on it, fill in any of the remaining
    2819              :  * parameters that aren't NULL, and return true.  If no such database,
    2820              :  * return false.
    2821              :  */
    2822              : static bool
    2823          488 : get_db_info(const char *name, LOCKMODE lockmode,
    2824              :             Oid *dbIdP, Oid *ownerIdP,
    2825              :             int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
    2826              :             TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
    2827              :             Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
    2828              :             char **dbIcurules,
    2829              :             char *dbLocProvider,
    2830              :             char **dbCollversion)
    2831              : {
    2832          488 :     bool        result = false;
    2833              :     Relation    relation;
    2834              : 
    2835              :     Assert(name);
    2836              : 
    2837              :     /* Caller may wish to grab a better lock on pg_database beforehand... */
    2838          488 :     relation = table_open(DatabaseRelationId, AccessShareLock);
    2839              : 
    2840              :     /*
    2841              :      * Loop covers the rare case where the database is renamed before we can
    2842              :      * lock it.  We try again just in case we can find a new one of the same
    2843              :      * name.
    2844              :      */
    2845              :     for (;;)
    2846            0 :     {
    2847              :         ScanKeyData scanKey;
    2848              :         SysScanDesc scan;
    2849              :         HeapTuple   tuple;
    2850              :         Oid         dbOid;
    2851              : 
    2852              :         /*
    2853              :          * there's no syscache for database-indexed-by-name, so must do it the
    2854              :          * hard way
    2855              :          */
    2856          488 :         ScanKeyInit(&scanKey,
    2857              :                     Anum_pg_database_datname,
    2858              :                     BTEqualStrategyNumber, F_NAMEEQ,
    2859              :                     CStringGetDatum(name));
    2860              : 
    2861          488 :         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
    2862              :                                   NULL, 1, &scanKey);
    2863              : 
    2864          488 :         tuple = systable_getnext(scan);
    2865              : 
    2866          488 :         if (!HeapTupleIsValid(tuple))
    2867              :         {
    2868              :             /* definitely no database of that name */
    2869           16 :             systable_endscan(scan);
    2870           16 :             break;
    2871              :         }
    2872              : 
    2873          472 :         dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
    2874              : 
    2875          472 :         systable_endscan(scan);
    2876              : 
    2877              :         /*
    2878              :          * Now that we have a database OID, we can try to lock the DB.
    2879              :          */
    2880          472 :         if (lockmode != NoLock)
    2881          472 :             LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2882              : 
    2883              :         /*
    2884              :          * And now, re-fetch the tuple by OID.  If it's still there and still
    2885              :          * the same name, we win; else, drop the lock and loop back to try
    2886              :          * again.
    2887              :          */
    2888          472 :         tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
    2889          472 :         if (HeapTupleIsValid(tuple))
    2890              :         {
    2891          472 :             Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
    2892              : 
    2893          472 :             if (strcmp(name, NameStr(dbform->datname)) == 0)
    2894              :             {
    2895              :                 Datum       datum;
    2896              :                 bool        isnull;
    2897              : 
    2898              :                 /* oid of the database */
    2899          472 :                 if (dbIdP)
    2900          472 :                     *dbIdP = dbOid;
    2901              :                 /* oid of the owner */
    2902          472 :                 if (ownerIdP)
    2903          405 :                     *ownerIdP = dbform->datdba;
    2904              :                 /* character encoding */
    2905          472 :                 if (encodingP)
    2906          405 :                     *encodingP = dbform->encoding;
    2907              :                 /* allowed as template? */
    2908          472 :                 if (dbIsTemplateP)
    2909          453 :                     *dbIsTemplateP = dbform->datistemplate;
    2910              :                 /* Has on login event trigger? */
    2911          472 :                 if (dbHasLoginEvtP)
    2912          405 :                     *dbHasLoginEvtP = dbform->dathasloginevt;
    2913              :                 /* allowing connections? */
    2914          472 :                 if (dbAllowConnP)
    2915          405 :                     *dbAllowConnP = dbform->datallowconn;
    2916              :                 /* limit of frozen XIDs */
    2917          472 :                 if (dbFrozenXidP)
    2918          405 :                     *dbFrozenXidP = dbform->datfrozenxid;
    2919              :                 /* minimum MultiXactId */
    2920          472 :                 if (dbMinMultiP)
    2921          405 :                     *dbMinMultiP = dbform->datminmxid;
    2922              :                 /* default tablespace for this database */
    2923          472 :                 if (dbTablespace)
    2924          417 :                     *dbTablespace = dbform->dattablespace;
    2925              :                 /* default locale settings for this database */
    2926          472 :                 if (dbLocProvider)
    2927          405 :                     *dbLocProvider = dbform->datlocprovider;
    2928          472 :                 if (dbCollate)
    2929              :                 {
    2930          405 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
    2931          405 :                     *dbCollate = TextDatumGetCString(datum);
    2932              :                 }
    2933          472 :                 if (dbCtype)
    2934              :                 {
    2935          405 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
    2936          405 :                     *dbCtype = TextDatumGetCString(datum);
    2937              :                 }
    2938          472 :                 if (dbLocale)
    2939              :                 {
    2940          405 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
    2941          405 :                     if (isnull)
    2942          376 :                         *dbLocale = NULL;
    2943              :                     else
    2944           29 :                         *dbLocale = TextDatumGetCString(datum);
    2945              :                 }
    2946          472 :                 if (dbIcurules)
    2947              :                 {
    2948          405 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
    2949          405 :                     if (isnull)
    2950          405 :                         *dbIcurules = NULL;
    2951              :                     else
    2952            0 :                         *dbIcurules = TextDatumGetCString(datum);
    2953              :                 }
    2954          472 :                 if (dbCollversion)
    2955              :                 {
    2956          405 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
    2957          405 :                     if (isnull)
    2958          245 :                         *dbCollversion = NULL;
    2959              :                     else
    2960          160 :                         *dbCollversion = TextDatumGetCString(datum);
    2961              :                 }
    2962          472 :                 ReleaseSysCache(tuple);
    2963          472 :                 result = true;
    2964          472 :                 break;
    2965              :             }
    2966              :             /* can only get here if it was just renamed */
    2967            0 :             ReleaseSysCache(tuple);
    2968              :         }
    2969              : 
    2970            0 :         if (lockmode != NoLock)
    2971            0 :             UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2972              :     }
    2973              : 
    2974          488 :     table_close(relation, AccessShareLock);
    2975              : 
    2976          488 :     return result;
    2977              : }
    2978              : 
    2979              : /* Check if current user has createdb privileges */
    2980              : bool
    2981          445 : have_createdb_privilege(void)
    2982              : {
    2983          445 :     bool        result = false;
    2984              :     HeapTuple   utup;
    2985              : 
    2986              :     /* Superusers can always do everything */
    2987          445 :     if (superuser())
    2988          427 :         return true;
    2989              : 
    2990           18 :     utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
    2991           18 :     if (HeapTupleIsValid(utup))
    2992              :     {
    2993           18 :         result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
    2994           18 :         ReleaseSysCache(utup);
    2995              :     }
    2996           18 :     return result;
    2997              : }
    2998              : 
    2999              : /*
    3000              :  * Remove tablespace directories
    3001              :  *
    3002              :  * We don't know what tablespaces db_id is using, so iterate through all
    3003              :  * tablespaces removing <tablespace>/db_id
    3004              :  */
    3005              : static void
    3006           49 : remove_dbtablespaces(Oid db_id)
    3007              : {
    3008              :     Relation    rel;
    3009              :     TableScanDesc scan;
    3010              :     HeapTuple   tuple;
    3011           49 :     List       *ltblspc = NIL;
    3012              :     ListCell   *cell;
    3013              :     int         ntblspc;
    3014              :     int         i;
    3015              :     Oid        *tablespace_ids;
    3016              : 
    3017           49 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3018           49 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3019          180 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3020              :     {
    3021          131 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3022          131 :         Oid         dsttablespace = spcform->oid;
    3023              :         char       *dstpath;
    3024              :         struct stat st;
    3025              : 
    3026              :         /* Don't mess with the global tablespace */
    3027          131 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3028           82 :             continue;
    3029              : 
    3030           82 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3031              : 
    3032           82 :         if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
    3033              :         {
    3034              :             /* Assume we can ignore it */
    3035           33 :             pfree(dstpath);
    3036           33 :             continue;
    3037              :         }
    3038              : 
    3039           49 :         if (!rmtree(dstpath, true))
    3040            0 :             ereport(WARNING,
    3041              :                     (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3042              :                             dstpath)));
    3043              : 
    3044           49 :         ltblspc = lappend_oid(ltblspc, dsttablespace);
    3045           49 :         pfree(dstpath);
    3046              :     }
    3047              : 
    3048           49 :     ntblspc = list_length(ltblspc);
    3049           49 :     if (ntblspc == 0)
    3050              :     {
    3051            0 :         table_endscan(scan);
    3052            0 :         table_close(rel, AccessShareLock);
    3053            0 :         return;
    3054              :     }
    3055              : 
    3056           49 :     tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
    3057           49 :     i = 0;
    3058           98 :     foreach(cell, ltblspc)
    3059           49 :         tablespace_ids[i++] = lfirst_oid(cell);
    3060              : 
    3061              :     /* Record the filesystem change in XLOG */
    3062              :     {
    3063              :         xl_dbase_drop_rec xlrec;
    3064              : 
    3065           49 :         xlrec.db_id = db_id;
    3066           49 :         xlrec.ntablespaces = ntblspc;
    3067              : 
    3068           49 :         XLogBeginInsert();
    3069           49 :         XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
    3070           49 :         XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
    3071              : 
    3072           49 :         (void) XLogInsert(RM_DBASE_ID,
    3073              :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    3074              :     }
    3075              : 
    3076           49 :     list_free(ltblspc);
    3077           49 :     pfree(tablespace_ids);
    3078              : 
    3079           49 :     table_endscan(scan);
    3080           49 :     table_close(rel, AccessShareLock);
    3081              : }
    3082              : 
    3083              : /*
    3084              :  * Check for existing files that conflict with a proposed new DB OID;
    3085              :  * return true if there are any
    3086              :  *
    3087              :  * If there were a subdirectory in any tablespace matching the proposed new
    3088              :  * OID, we'd get a create failure due to the duplicate name ... and then we'd
    3089              :  * try to remove that already-existing subdirectory during the cleanup in
    3090              :  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
    3091              :  * instead we make this extra check before settling on the OID of the new
    3092              :  * database.  This exactly parallels what GetNewRelFileNumber() does for table
    3093              :  * relfilenumber values.
    3094              :  */
    3095              : static bool
    3096          391 : check_db_file_conflict(Oid db_id)
    3097              : {
    3098          391 :     bool        result = false;
    3099              :     Relation    rel;
    3100              :     TableScanDesc scan;
    3101              :     HeapTuple   tuple;
    3102              : 
    3103          391 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    3104          391 :     scan = table_beginscan_catalog(rel, 0, NULL);
    3105         1243 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    3106              :     {
    3107          852 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    3108          852 :         Oid         dsttablespace = spcform->oid;
    3109              :         char       *dstpath;
    3110              :         struct stat st;
    3111              : 
    3112              :         /* Don't mess with the global tablespace */
    3113          852 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    3114          391 :             continue;
    3115              : 
    3116          461 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    3117              : 
    3118          461 :         if (lstat(dstpath, &st) == 0)
    3119              :         {
    3120              :             /* Found a conflicting file (or directory, whatever) */
    3121            0 :             pfree(dstpath);
    3122            0 :             result = true;
    3123            0 :             break;
    3124              :         }
    3125              : 
    3126          461 :         pfree(dstpath);
    3127              :     }
    3128              : 
    3129          391 :     table_endscan(scan);
    3130          391 :     table_close(rel, AccessShareLock);
    3131              : 
    3132          391 :     return result;
    3133              : }
    3134              : 
    3135              : /*
    3136              :  * Issue a suitable errdetail message for a busy database
    3137              :  */
    3138              : static int
    3139            1 : errdetail_busy_db(int notherbackends, int npreparedxacts)
    3140              : {
    3141            1 :     if (notherbackends > 0 && npreparedxacts > 0)
    3142              : 
    3143              :         /*
    3144              :          * We don't deal with singular versus plural here, since gettext
    3145              :          * doesn't support multiple plurals in one string.
    3146              :          */
    3147            0 :         errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
    3148              :                   notherbackends, npreparedxacts);
    3149            1 :     else if (notherbackends > 0)
    3150            1 :         errdetail_plural("There is %d other session using the database.",
    3151              :                          "There are %d other sessions using the database.",
    3152              :                          notherbackends,
    3153              :                          notherbackends);
    3154              :     else
    3155            0 :         errdetail_plural("There is %d prepared transaction using the database.",
    3156              :                          "There are %d prepared transactions using the database.",
    3157              :                          npreparedxacts,
    3158              :                          npreparedxacts);
    3159            1 :     return 0;                   /* just to keep ereport macro happy */
    3160              : }
    3161              : 
    3162              : /*
    3163              :  * get_database_oid - given a database name, look up the OID
    3164              :  *
    3165              :  * If missing_ok is false, throw an error if database name not found.  If
    3166              :  * true, just return InvalidOid.
    3167              :  */
    3168              : Oid
    3169         1544 : get_database_oid(const char *dbname, bool missing_ok)
    3170              : {
    3171              :     Relation    pg_database;
    3172              :     ScanKeyData entry[1];
    3173              :     SysScanDesc scan;
    3174              :     HeapTuple   dbtuple;
    3175              :     Oid         oid;
    3176              : 
    3177              :     /*
    3178              :      * There's no syscache for pg_database indexed by name, so we must look
    3179              :      * the hard way.
    3180              :      */
    3181         1544 :     pg_database = table_open(DatabaseRelationId, AccessShareLock);
    3182         1544 :     ScanKeyInit(&entry[0],
    3183              :                 Anum_pg_database_datname,
    3184              :                 BTEqualStrategyNumber, F_NAMEEQ,
    3185              :                 CStringGetDatum(dbname));
    3186         1544 :     scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
    3187              :                               NULL, 1, entry);
    3188              : 
    3189         1544 :     dbtuple = systable_getnext(scan);
    3190              : 
    3191              :     /* We assume that there can be at most one matching tuple */
    3192         1544 :     if (HeapTupleIsValid(dbtuple))
    3193         1127 :         oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
    3194              :     else
    3195          417 :         oid = InvalidOid;
    3196              : 
    3197         1544 :     systable_endscan(scan);
    3198         1544 :     table_close(pg_database, AccessShareLock);
    3199              : 
    3200         1544 :     if (!OidIsValid(oid) && !missing_ok)
    3201            3 :         ereport(ERROR,
    3202              :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    3203              :                  errmsg("database \"%s\" does not exist",
    3204              :                         dbname)));
    3205              : 
    3206         1541 :     return oid;
    3207              : }
    3208              : 
    3209              : 
    3210              : /*
    3211              :  * While dropping a database the pg_database row is marked invalid, but the
    3212              :  * catalog contents still exist. Connections to such a database are not
    3213              :  * allowed.
    3214              :  */
    3215              : bool
    3216        27688 : database_is_invalid_form(Form_pg_database datform)
    3217              : {
    3218        27688 :     return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
    3219              : }
    3220              : 
    3221              : 
    3222              : /*
    3223              :  * Convenience wrapper around database_is_invalid_form()
    3224              :  */
    3225              : bool
    3226          405 : database_is_invalid_oid(Oid dboid)
    3227              : {
    3228              :     HeapTuple   dbtup;
    3229              :     Form_pg_database dbform;
    3230              :     bool        invalid;
    3231              : 
    3232          405 :     dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
    3233          405 :     if (!HeapTupleIsValid(dbtup))
    3234            0 :         elog(ERROR, "cache lookup failed for database %u", dboid);
    3235          405 :     dbform = (Form_pg_database) GETSTRUCT(dbtup);
    3236              : 
    3237          405 :     invalid = database_is_invalid_form(dbform);
    3238              : 
    3239          405 :     ReleaseSysCache(dbtup);
    3240              : 
    3241          405 :     return invalid;
    3242              : }
    3243              : 
    3244              : 
    3245              : /*
    3246              :  * recovery_create_dbdir()
    3247              :  *
    3248              :  * During recovery, there's a case where we validly need to recover a missing
    3249              :  * tablespace directory so that recovery can continue.  This happens when
    3250              :  * recovery wants to create a database but the holding tablespace has been
    3251              :  * removed before the server stopped.  Since we expect that the directory will
    3252              :  * be gone before reaching recovery consistency, and we have no knowledge about
    3253              :  * the tablespace other than its OID here, we create a real directory under
    3254              :  * pg_tblspc here instead of restoring the symlink.
    3255              :  *
    3256              :  * If only_tblspc is true, then the requested directory must be in pg_tblspc/
    3257              :  */
    3258              : static void
    3259           23 : recovery_create_dbdir(char *path, bool only_tblspc)
    3260              : {
    3261              :     struct stat st;
    3262              : 
    3263              :     Assert(RecoveryInProgress());
    3264              : 
    3265           23 :     if (stat(path, &st) == 0)
    3266           23 :         return;
    3267              : 
    3268            0 :     if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
    3269            0 :         elog(PANIC, "requested to created invalid directory: %s", path);
    3270              : 
    3271            0 :     if (reachedConsistency && !allow_in_place_tablespaces)
    3272            0 :         ereport(PANIC,
    3273              :                 errmsg("missing directory \"%s\"", path));
    3274              : 
    3275            0 :     elog(reachedConsistency ? WARNING : DEBUG1,
    3276              :          "creating missing directory: %s", path);
    3277              : 
    3278            0 :     if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
    3279            0 :         ereport(PANIC,
    3280              :                 errmsg("could not create missing directory \"%s\": %m", path));
    3281              : }
    3282              : 
    3283              : 
    3284              : /*
    3285              :  * DATABASE resource manager's routines
    3286              :  */
    3287              : void
    3288           42 : dbase_redo(XLogReaderState *record)
    3289              : {
    3290           42 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    3291              : 
    3292              :     /* Backup blocks are not used in dbase records */
    3293              :     Assert(!XLogRecHasAnyBlockRefs(record));
    3294              : 
    3295           42 :     if (info == XLOG_DBASE_CREATE_FILE_COPY)
    3296              :     {
    3297            5 :         xl_dbase_create_file_copy_rec *xlrec =
    3298            5 :             (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
    3299              :         char       *src_path;
    3300              :         char       *dst_path;
    3301              :         char       *parent_path;
    3302              :         struct stat st;
    3303              : 
    3304            5 :         src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
    3305            5 :         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3306              : 
    3307              :         /*
    3308              :          * Our theory for replaying a CREATE is to forcibly drop the target
    3309              :          * subdirectory if present, then re-copy the source data. This may be
    3310              :          * more work than needed, but it is simple to implement.
    3311              :          */
    3312            5 :         if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
    3313              :         {
    3314            0 :             if (!rmtree(dst_path, true))
    3315              :                 /* If this failed, copydir() below is going to error. */
    3316            0 :                 ereport(WARNING,
    3317              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3318              :                                 dst_path)));
    3319              :         }
    3320              : 
    3321              :         /*
    3322              :          * If the parent of the target path doesn't exist, create it now. This
    3323              :          * enables us to create the target underneath later.
    3324              :          */
    3325            5 :         parent_path = pstrdup(dst_path);
    3326            5 :         get_parent_directory(parent_path);
    3327            5 :         if (stat(parent_path, &st) < 0)
    3328              :         {
    3329            0 :             if (errno != ENOENT)
    3330            0 :                 ereport(FATAL,
    3331              :                         errmsg("could not stat directory \"%s\": %m",
    3332              :                                dst_path));
    3333              : 
    3334              :             /* create the parent directory if needed and valid */
    3335            0 :             recovery_create_dbdir(parent_path, true);
    3336              :         }
    3337            5 :         pfree(parent_path);
    3338              : 
    3339              :         /*
    3340              :          * There's a case where the copy source directory is missing for the
    3341              :          * same reason above.  Create the empty source directory so that
    3342              :          * copydir below doesn't fail.  The directory will be dropped soon by
    3343              :          * recovery.
    3344              :          */
    3345            5 :         if (stat(src_path, &st) < 0 && errno == ENOENT)
    3346            0 :             recovery_create_dbdir(src_path, false);
    3347              : 
    3348              :         /*
    3349              :          * Force dirty buffers out to disk, to ensure source database is
    3350              :          * up-to-date for the copy.
    3351              :          */
    3352            5 :         FlushDatabaseBuffers(xlrec->src_db_id);
    3353              : 
    3354              :         /* Close all smgr fds in all backends. */
    3355            5 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3356              : 
    3357              :         /*
    3358              :          * Copy this subdirectory to the new location
    3359              :          *
    3360              :          * We don't need to copy subdirectories
    3361              :          */
    3362            5 :         copydir(src_path, dst_path, false);
    3363              : 
    3364            5 :         pfree(src_path);
    3365            5 :         pfree(dst_path);
    3366              :     }
    3367           37 :     else if (info == XLOG_DBASE_CREATE_WAL_LOG)
    3368              :     {
    3369           23 :         xl_dbase_create_wal_log_rec *xlrec =
    3370           23 :             (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
    3371              :         char       *dbpath;
    3372              :         char       *parent_path;
    3373              : 
    3374           23 :         dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3375              : 
    3376              :         /* create the parent directory if needed and valid */
    3377           23 :         parent_path = pstrdup(dbpath);
    3378           23 :         get_parent_directory(parent_path);
    3379           23 :         recovery_create_dbdir(parent_path, true);
    3380           23 :         pfree(parent_path);
    3381              : 
    3382              :         /* Create the database directory with the version file. */
    3383           23 :         CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
    3384              :                                 true);
    3385           23 :         pfree(dbpath);
    3386              :     }
    3387           14 :     else if (info == XLOG_DBASE_DROP)
    3388              :     {
    3389           14 :         xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
    3390              :         char       *dst_path;
    3391              :         int         i;
    3392              : 
    3393           14 :         if (InHotStandby)
    3394              :         {
    3395              :             /*
    3396              :              * Lock database while we resolve conflicts to ensure that
    3397              :              * InitPostgres() cannot fully re-execute concurrently. This
    3398              :              * avoids backends re-connecting automatically to same database,
    3399              :              * which can happen in some cases.
    3400              :              *
    3401              :              * This will lock out walsenders trying to connect to db-specific
    3402              :              * slots for logical decoding too, so it's safe for us to drop
    3403              :              * slots.
    3404              :              */
    3405           14 :             LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3406           14 :             ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    3407              :         }
    3408              : 
    3409              :         /* Drop any database-specific replication slots */
    3410           14 :         ReplicationSlotsDropDBSlots(xlrec->db_id);
    3411              : 
    3412              :         /* Drop pages for this database that are in the shared buffer cache */
    3413           14 :         DropDatabaseBuffers(xlrec->db_id);
    3414              : 
    3415              :         /* Also, clean out any fsync requests that might be pending in md.c */
    3416           14 :         ForgetDatabaseSyncRequests(xlrec->db_id);
    3417              : 
    3418              :         /* Clean out the xlog relcache too */
    3419           14 :         XLogDropDatabase(xlrec->db_id);
    3420              : 
    3421              :         /* Close all smgr fds in all backends. */
    3422           14 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3423              : 
    3424           28 :         for (i = 0; i < xlrec->ntablespaces; i++)
    3425              :         {
    3426           14 :             dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
    3427              : 
    3428              :             /* And remove the physical files */
    3429           14 :             if (!rmtree(dst_path, true))
    3430            0 :                 ereport(WARNING,
    3431              :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3432              :                                 dst_path)));
    3433           14 :             pfree(dst_path);
    3434              :         }
    3435              : 
    3436           14 :         if (InHotStandby)
    3437              :         {
    3438              :             /*
    3439              :              * Release locks prior to commit. XXX There is a race condition
    3440              :              * here that may allow backends to reconnect, but the window for
    3441              :              * this is small because the gap between here and commit is mostly
    3442              :              * fairly small and it is unlikely that people will be dropping
    3443              :              * databases that we are trying to connect to anyway.
    3444              :              */
    3445           14 :             UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3446              :         }
    3447              :     }
    3448              :     else
    3449            0 :         elog(PANIC, "dbase_redo: unknown op code %u", info);
    3450           42 : }
        

Generated by: LCOV version 2.0-1