LCOV - code coverage report
Current view: top level - src/backend/commands - dbcommands.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 432 674 64.1 %
Date: 2020-06-01 08:06:25 Functions: 14 19 73.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * dbcommands.c
       4             :  *      Database management commands (create/drop database).
       5             :  *
       6             :  * Note: database creation/destruction commands use exclusive locks on
       7             :  * the database objects (as expressed by LockSharedObject()) to avoid
       8             :  * stepping on each others' toes.  Formerly we used table-level locks
       9             :  * on pg_database, but that's too coarse-grained.
      10             :  *
      11             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
      12             :  * Portions Copyright (c) 1994, Regents of the University of California
      13             :  *
      14             :  *
      15             :  * IDENTIFICATION
      16             :  *    src/backend/commands/dbcommands.c
      17             :  *
      18             :  *-------------------------------------------------------------------------
      19             :  */
      20             : #include "postgres.h"
      21             : 
      22             : #include <fcntl.h>
      23             : #include <unistd.h>
      24             : #include <sys/stat.h>
      25             : 
      26             : #include "access/genam.h"
      27             : #include "access/heapam.h"
      28             : #include "access/htup_details.h"
      29             : #include "access/multixact.h"
      30             : #include "access/tableam.h"
      31             : #include "access/xact.h"
      32             : #include "access/xloginsert.h"
      33             : #include "access/xlogutils.h"
      34             : #include "catalog/catalog.h"
      35             : #include "catalog/dependency.h"
      36             : #include "catalog/indexing.h"
      37             : #include "catalog/objectaccess.h"
      38             : #include "catalog/pg_authid.h"
      39             : #include "catalog/pg_database.h"
      40             : #include "catalog/pg_db_role_setting.h"
      41             : #include "catalog/pg_subscription.h"
      42             : #include "catalog/pg_tablespace.h"
      43             : #include "commands/comment.h"
      44             : #include "commands/dbcommands.h"
      45             : #include "commands/dbcommands_xlog.h"
      46             : #include "commands/defrem.h"
      47             : #include "commands/seclabel.h"
      48             : #include "commands/tablespace.h"
      49             : #include "mb/pg_wchar.h"
      50             : #include "miscadmin.h"
      51             : #include "pgstat.h"
      52             : #include "postmaster/bgwriter.h"
      53             : #include "replication/slot.h"
      54             : #include "storage/copydir.h"
      55             : #include "storage/fd.h"
      56             : #include "storage/ipc.h"
      57             : #include "storage/lmgr.h"
      58             : #include "storage/md.h"
      59             : #include "storage/procarray.h"
      60             : #include "storage/smgr.h"
      61             : #include "utils/acl.h"
      62             : #include "utils/builtins.h"
      63             : #include "utils/fmgroids.h"
      64             : #include "utils/pg_locale.h"
      65             : #include "utils/snapmgr.h"
      66             : #include "utils/syscache.h"
      67             : 
      68             : typedef struct
      69             : {
      70             :     Oid         src_dboid;      /* source (template) DB */
      71             :     Oid         dest_dboid;     /* DB we are trying to create */
      72             : } createdb_failure_params;
      73             : 
      74             : typedef struct
      75             : {
      76             :     Oid         dest_dboid;     /* DB we are trying to move */
      77             :     Oid         dest_tsoid;     /* tablespace we are trying to move to */
      78             : } movedb_failure_params;
      79             : 
      80             : /* non-export function prototypes */
      81             : static void createdb_failure_callback(int code, Datum arg);
      82             : static void movedb(const char *dbname, const char *tblspcname);
      83             : static void movedb_failure_callback(int code, Datum arg);
      84             : static bool get_db_info(const char *name, LOCKMODE lockmode,
      85             :                         Oid *dbIdP, Oid *ownerIdP,
      86             :                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
      87             :                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
      88             :                         MultiXactId *dbMinMultiP,
      89             :                         Oid *dbTablespace, char **dbCollate, char **dbCtype);
      90             : static bool have_createdb_privilege(void);
      91             : static void remove_dbtablespaces(Oid db_id);
      92             : static bool check_db_file_conflict(Oid db_id);
      93             : static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
      94             : 
      95             : 
      96             : /*
      97             :  * CREATE DATABASE
      98             :  */
      99             : Oid
     100         960 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
     101             : {
     102             :     TableScanDesc scan;
     103             :     Relation    rel;
     104             :     Oid         src_dboid;
     105             :     Oid         src_owner;
     106         960 :     int         src_encoding = -1;
     107         960 :     char       *src_collate = NULL;
     108         960 :     char       *src_ctype = NULL;
     109             :     bool        src_istemplate;
     110             :     bool        src_allowconn;
     111         960 :     Oid         src_lastsysoid = InvalidOid;
     112         960 :     TransactionId src_frozenxid = InvalidTransactionId;
     113         960 :     MultiXactId src_minmxid = InvalidMultiXactId;
     114             :     Oid         src_deftablespace;
     115             :     volatile Oid dst_deftablespace;
     116             :     Relation    pg_database_rel;
     117             :     HeapTuple   tuple;
     118             :     Datum       new_record[Natts_pg_database];
     119             :     bool        new_record_nulls[Natts_pg_database];
     120             :     Oid         dboid;
     121             :     Oid         datdba;
     122             :     ListCell   *option;
     123         960 :     DefElem    *dtablespacename = NULL;
     124         960 :     DefElem    *downer = NULL;
     125         960 :     DefElem    *dtemplate = NULL;
     126         960 :     DefElem    *dencoding = NULL;
     127         960 :     DefElem    *dlocale = NULL;
     128         960 :     DefElem    *dcollate = NULL;
     129         960 :     DefElem    *dctype = NULL;
     130         960 :     DefElem    *distemplate = NULL;
     131         960 :     DefElem    *dallowconnections = NULL;
     132         960 :     DefElem    *dconnlimit = NULL;
     133         960 :     char       *dbname = stmt->dbname;
     134         960 :     char       *dbowner = NULL;
     135         960 :     const char *dbtemplate = NULL;
     136         960 :     char       *dbcollate = NULL;
     137         960 :     char       *dbctype = NULL;
     138             :     char       *canonname;
     139         960 :     int         encoding = -1;
     140         960 :     bool        dbistemplate = false;
     141         960 :     bool        dballowconnections = true;
     142         960 :     int         dbconnlimit = -1;
     143             :     int         notherbackends;
     144             :     int         npreparedxacts;
     145             :     createdb_failure_params fparms;
     146             : 
     147             :     /* Extract options from the statement node tree */
     148        1930 :     foreach(option, stmt->options)
     149             :     {
     150         970 :         DefElem    *defel = (DefElem *) lfirst(option);
     151             : 
     152         970 :         if (strcmp(defel->defname, "tablespace") == 0)
     153             :         {
     154           0 :             if (dtablespacename)
     155           0 :                 ereport(ERROR,
     156             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     157             :                          errmsg("conflicting or redundant options"),
     158             :                          parser_errposition(pstate, defel->location)));
     159           0 :             dtablespacename = defel;
     160             :         }
     161         970 :         else if (strcmp(defel->defname, "owner") == 0)
     162             :         {
     163           0 :             if (downer)
     164           0 :                 ereport(ERROR,
     165             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     166             :                          errmsg("conflicting or redundant options"),
     167             :                          parser_errposition(pstate, defel->location)));
     168           0 :             downer = defel;
     169             :         }
     170         970 :         else if (strcmp(defel->defname, "template") == 0)
     171             :         {
     172         172 :             if (dtemplate)
     173           0 :                 ereport(ERROR,
     174             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     175             :                          errmsg("conflicting or redundant options"),
     176             :                          parser_errposition(pstate, defel->location)));
     177         172 :             dtemplate = defel;
     178             :         }
     179         798 :         else if (strcmp(defel->defname, "encoding") == 0)
     180             :         {
     181          42 :             if (dencoding)
     182           0 :                 ereport(ERROR,
     183             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     184             :                          errmsg("conflicting or redundant options"),
     185             :                          parser_errposition(pstate, defel->location)));
     186          42 :             dencoding = defel;
     187             :         }
     188         756 :         else if (strcmp(defel->defname, "locale") == 0)
     189             :         {
     190          32 :             if (dlocale)
     191           0 :                 ereport(ERROR,
     192             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     193             :                          errmsg("conflicting or redundant options"),
     194             :                          parser_errposition(pstate, defel->location)));
     195          32 :             dlocale = defel;
     196             :         }
     197         724 :         else if (strcmp(defel->defname, "lc_collate") == 0)
     198             :         {
     199           4 :             if (dcollate)
     200           0 :                 ereport(ERROR,
     201             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     202             :                          errmsg("conflicting or redundant options"),
     203             :                          parser_errposition(pstate, defel->location)));
     204           4 :             dcollate = defel;
     205             :         }
     206         720 :         else if (strcmp(defel->defname, "lc_ctype") == 0)
     207             :         {
     208           4 :             if (dctype)
     209           0 :                 ereport(ERROR,
     210             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     211             :                          errmsg("conflicting or redundant options"),
     212             :                          parser_errposition(pstate, defel->location)));
     213           4 :             dctype = defel;
     214             :         }
     215         716 :         else if (strcmp(defel->defname, "is_template") == 0)
     216             :         {
     217         358 :             if (distemplate)
     218           0 :                 ereport(ERROR,
     219             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     220             :                          errmsg("conflicting or redundant options"),
     221             :                          parser_errposition(pstate, defel->location)));
     222         358 :             distemplate = defel;
     223             :         }
     224         358 :         else if (strcmp(defel->defname, "allow_connections") == 0)
     225             :         {
     226         358 :             if (dallowconnections)
     227           0 :                 ereport(ERROR,
     228             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     229             :                          errmsg("conflicting or redundant options"),
     230             :                          parser_errposition(pstate, defel->location)));
     231         358 :             dallowconnections = defel;
     232             :         }
     233           0 :         else if (strcmp(defel->defname, "connection_limit") == 0)
     234             :         {
     235           0 :             if (dconnlimit)
     236           0 :                 ereport(ERROR,
     237             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     238             :                          errmsg("conflicting or redundant options"),
     239             :                          parser_errposition(pstate, defel->location)));
     240           0 :             dconnlimit = defel;
     241             :         }
     242           0 :         else if (strcmp(defel->defname, "location") == 0)
     243             :         {
     244           0 :             ereport(WARNING,
     245             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     246             :                      errmsg("LOCATION is not supported anymore"),
     247             :                      errhint("Consider using tablespaces instead."),
     248             :                      parser_errposition(pstate, defel->location)));
     249             :         }
     250             :         else
     251           0 :             ereport(ERROR,
     252             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     253             :                      errmsg("option \"%s\" not recognized", defel->defname),
     254             :                      parser_errposition(pstate, defel->location)));
     255             :     }
     256             : 
     257         960 :     if (dlocale && (dcollate || dctype))
     258           0 :         ereport(ERROR,
     259             :                 (errcode(ERRCODE_SYNTAX_ERROR),
     260             :                  errmsg("conflicting or redundant options"),
     261             :                  errdetail("LOCALE cannot be specified together with LC_COLLATE or LC_CTYPE.")));
     262             : 
     263         960 :     if (downer && downer->arg)
     264           0 :         dbowner = defGetString(downer);
     265         960 :     if (dtemplate && dtemplate->arg)
     266         172 :         dbtemplate = defGetString(dtemplate);
     267         960 :     if (dencoding && dencoding->arg)
     268             :     {
     269             :         const char *encoding_name;
     270             : 
     271          42 :         if (IsA(dencoding->arg, Integer))
     272             :         {
     273           0 :             encoding = defGetInt32(dencoding);
     274           0 :             encoding_name = pg_encoding_to_char(encoding);
     275           0 :             if (strcmp(encoding_name, "") == 0 ||
     276           0 :                 pg_valid_server_encoding(encoding_name) < 0)
     277           0 :                 ereport(ERROR,
     278             :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     279             :                          errmsg("%d is not a valid encoding code",
     280             :                                 encoding),
     281             :                          parser_errposition(pstate, dencoding->location)));
     282             :         }
     283             :         else
     284             :         {
     285          42 :             encoding_name = defGetString(dencoding);
     286          42 :             encoding = pg_valid_server_encoding(encoding_name);
     287          42 :             if (encoding < 0)
     288           0 :                 ereport(ERROR,
     289             :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     290             :                          errmsg("%s is not a valid encoding name",
     291             :                                 encoding_name),
     292             :                          parser_errposition(pstate, dencoding->location)));
     293             :         }
     294             :     }
     295         960 :     if (dlocale && dlocale->arg)
     296             :     {
     297          32 :         dbcollate = defGetString(dlocale);
     298          32 :         dbctype = defGetString(dlocale);
     299             :     }
     300         960 :     if (dcollate && dcollate->arg)
     301           4 :         dbcollate = defGetString(dcollate);
     302         960 :     if (dctype && dctype->arg)
     303           4 :         dbctype = defGetString(dctype);
     304         960 :     if (distemplate && distemplate->arg)
     305         358 :         dbistemplate = defGetBoolean(distemplate);
     306         960 :     if (dallowconnections && dallowconnections->arg)
     307         358 :         dballowconnections = defGetBoolean(dallowconnections);
     308         960 :     if (dconnlimit && dconnlimit->arg)
     309             :     {
     310           0 :         dbconnlimit = defGetInt32(dconnlimit);
     311           0 :         if (dbconnlimit < -1)
     312           0 :             ereport(ERROR,
     313             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     314             :                      errmsg("invalid connection limit: %d", dbconnlimit)));
     315             :     }
     316             : 
     317             :     /* obtain OID of proposed owner */
     318         960 :     if (dbowner)
     319           0 :         datdba = get_role_oid(dbowner, false);
     320             :     else
     321         960 :         datdba = GetUserId();
     322             : 
     323             :     /*
     324             :      * To create a database, must have createdb privilege and must be able to
     325             :      * become the target role (this does not imply that the target role itself
     326             :      * must have createdb privilege).  The latter provision guards against
     327             :      * "giveaway" attacks.  Note that a superuser will always have both of
     328             :      * these privileges a fortiori.
     329             :      */
     330         960 :     if (!have_createdb_privilege())
     331           0 :         ereport(ERROR,
     332             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     333             :                  errmsg("permission denied to create database")));
     334             : 
     335         960 :     check_is_member_of_role(GetUserId(), datdba);
     336             : 
     337             :     /*
     338             :      * Lookup database (template) to be cloned, and obtain share lock on it.
     339             :      * ShareLock allows two CREATE DATABASEs to work from the same template
     340             :      * concurrently, while ensuring no one is busy dropping it in parallel
     341             :      * (which would be Very Bad since we'd likely get an incomplete copy
     342             :      * without knowing it).  This also prevents any new connections from being
     343             :      * made to the source until we finish copying it, so we can be sure it
     344             :      * won't change underneath us.
     345             :      */
     346         960 :     if (!dbtemplate)
     347         788 :         dbtemplate = "template1"; /* Default template database name */
     348             : 
     349         960 :     if (!get_db_info(dbtemplate, ShareLock,
     350             :                      &src_dboid, &src_owner, &src_encoding,
     351             :                      &src_istemplate, &src_allowconn, &src_lastsysoid,
     352             :                      &src_frozenxid, &src_minmxid, &src_deftablespace,
     353             :                      &src_collate, &src_ctype))
     354           0 :         ereport(ERROR,
     355             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
     356             :                  errmsg("template database \"%s\" does not exist",
     357             :                         dbtemplate)));
     358             : 
     359             :     /*
     360             :      * Permission check: to copy a DB that's not marked datistemplate, you
     361             :      * must be superuser or the owner thereof.
     362             :      */
     363         960 :     if (!src_istemplate)
     364             :     {
     365           0 :         if (!pg_database_ownercheck(src_dboid, GetUserId()))
     366           0 :             ereport(ERROR,
     367             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     368             :                      errmsg("permission denied to copy database \"%s\"",
     369             :                             dbtemplate)));
     370             :     }
     371             : 
     372             :     /* If encoding or locales are defaulted, use source's setting */
     373         960 :     if (encoding < 0)
     374         918 :         encoding = src_encoding;
     375         960 :     if (dbcollate == NULL)
     376         924 :         dbcollate = src_collate;
     377         960 :     if (dbctype == NULL)
     378         924 :         dbctype = src_ctype;
     379             : 
     380             :     /* Some encodings are client only */
     381         960 :     if (!PG_VALID_BE_ENCODING(encoding))
     382           0 :         ereport(ERROR,
     383             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     384             :                  errmsg("invalid server encoding %d", encoding)));
     385             : 
     386             :     /* Check that the chosen locales are valid, and get canonical spellings */
     387         960 :     if (!check_locale(LC_COLLATE, dbcollate, &canonname))
     388           0 :         ereport(ERROR,
     389             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     390             :                  errmsg("invalid locale name: \"%s\"", dbcollate)));
     391         960 :     dbcollate = canonname;
     392         960 :     if (!check_locale(LC_CTYPE, dbctype, &canonname))
     393           0 :         ereport(ERROR,
     394             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     395             :                  errmsg("invalid locale name: \"%s\"", dbctype)));
     396         960 :     dbctype = canonname;
     397             : 
     398         960 :     check_encoding_locale_matches(encoding, dbcollate, dbctype);
     399             : 
     400             :     /*
     401             :      * Check that the new encoding and locale settings match the source
     402             :      * database.  We insist on this because we simply copy the source data ---
     403             :      * any non-ASCII data would be wrongly encoded, and any indexes sorted
     404             :      * according to the source locale would be wrong.
     405             :      *
     406             :      * However, we assume that template0 doesn't contain any non-ASCII data
     407             :      * nor any indexes that depend on collation or ctype, so template0 can be
     408             :      * used as template for creating a database with any encoding or locale.
     409             :      */
     410         960 :     if (strcmp(dbtemplate, "template0") != 0)
     411             :     {
     412         788 :         if (encoding != src_encoding)
     413           0 :             ereport(ERROR,
     414             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     415             :                      errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
     416             :                             pg_encoding_to_char(encoding),
     417             :                             pg_encoding_to_char(src_encoding)),
     418             :                      errhint("Use the same encoding as in the template database, or use template0 as template.")));
     419             : 
     420         788 :         if (strcmp(dbcollate, src_collate) != 0)
     421           0 :             ereport(ERROR,
     422             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     423             :                      errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
     424             :                             dbcollate, src_collate),
     425             :                      errhint("Use the same collation as in the template database, or use template0 as template.")));
     426             : 
     427         788 :         if (strcmp(dbctype, src_ctype) != 0)
     428           0 :             ereport(ERROR,
     429             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     430             :                      errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
     431             :                             dbctype, src_ctype),
     432             :                      errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
     433             :     }
     434             : 
     435             :     /* Resolve default tablespace for new database */
     436         960 :     if (dtablespacename && dtablespacename->arg)
     437           0 :     {
     438             :         char       *tablespacename;
     439             :         AclResult   aclresult;
     440             : 
     441           0 :         tablespacename = defGetString(dtablespacename);
     442           0 :         dst_deftablespace = get_tablespace_oid(tablespacename, false);
     443             :         /* check permissions */
     444           0 :         aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
     445             :                                            ACL_CREATE);
     446           0 :         if (aclresult != ACLCHECK_OK)
     447           0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
     448             :                            tablespacename);
     449             : 
     450             :         /* pg_global must never be the default tablespace */
     451           0 :         if (dst_deftablespace == GLOBALTABLESPACE_OID)
     452           0 :             ereport(ERROR,
     453             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     454             :                      errmsg("pg_global cannot be used as default tablespace")));
     455             : 
     456             :         /*
     457             :          * If we are trying to change the default tablespace of the template,
     458             :          * we require that the template not have any files in the new default
     459             :          * tablespace.  This is necessary because otherwise the copied
     460             :          * database would contain pg_class rows that refer to its default
     461             :          * tablespace both explicitly (by OID) and implicitly (as zero), which
     462             :          * would cause problems.  For example another CREATE DATABASE using
     463             :          * the copied database as template, and trying to change its default
     464             :          * tablespace again, would yield outright incorrect results (it would
     465             :          * improperly move tables to the new default tablespace that should
     466             :          * stay in the same tablespace).
     467             :          */
     468           0 :         if (dst_deftablespace != src_deftablespace)
     469             :         {
     470             :             char       *srcpath;
     471             :             struct stat st;
     472             : 
     473           0 :             srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
     474             : 
     475           0 :             if (stat(srcpath, &st) == 0 &&
     476           0 :                 S_ISDIR(st.st_mode) &&
     477           0 :                 !directory_is_empty(srcpath))
     478           0 :                 ereport(ERROR,
     479             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     480             :                          errmsg("cannot assign new default tablespace \"%s\"",
     481             :                                 tablespacename),
     482             :                          errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
     483             :                                    dbtemplate)));
     484           0 :             pfree(srcpath);
     485             :         }
     486             :     }
     487             :     else
     488             :     {
     489             :         /* Use template database's default tablespace */
     490         960 :         dst_deftablespace = src_deftablespace;
     491             :         /* Note there is no additional permission check in this path */
     492             :     }
     493             : 
     494             :     /*
     495             :      * If built with appropriate switch, whine when regression-testing
     496             :      * conventions for database names are violated.  But don't complain during
     497             :      * initdb.
     498             :      */
     499             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     500             :     if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
     501             :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
     502             : #endif
     503             : 
     504             :     /*
     505             :      * Check for db name conflict.  This is just to give a more friendly error
     506             :      * message than "unique index violation".  There's a race condition but
     507             :      * we're willing to accept the less friendly message in that case.
     508             :      */
     509         960 :     if (OidIsValid(get_database_oid(dbname, true)))
     510           2 :         ereport(ERROR,
     511             :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
     512             :                  errmsg("database \"%s\" already exists", dbname)));
     513             : 
     514             :     /*
     515             :      * The source DB can't have any active backends, except this one
     516             :      * (exception is to allow CREATE DB while connected to template1).
     517             :      * Otherwise we might copy inconsistent data.
     518             :      *
     519             :      * This should be last among the basic error checks, because it involves
     520             :      * potential waiting; we may as well throw an error first if we're gonna
     521             :      * throw one.
     522             :      */
     523         958 :     if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
     524           0 :         ereport(ERROR,
     525             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     526             :                  errmsg("source database \"%s\" is being accessed by other users",
     527             :                         dbtemplate),
     528             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
     529             : 
     530             :     /*
     531             :      * Select an OID for the new database, checking that it doesn't have a
     532             :      * filename conflict with anything already existing in the tablespace
     533             :      * directories.
     534             :      */
     535         958 :     pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
     536             : 
     537             :     do
     538             :     {
     539         958 :         dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
     540             :                                    Anum_pg_database_oid);
     541         958 :     } while (check_db_file_conflict(dboid));
     542             : 
     543             :     /*
     544             :      * Insert a new tuple into pg_database.  This establishes our ownership of
     545             :      * the new database name (anyone else trying to insert the same name will
     546             :      * block on the unique index, and fail after we commit).
     547             :      */
     548             : 
     549             :     /* Form tuple */
     550       14370 :     MemSet(new_record, 0, sizeof(new_record));
     551         958 :     MemSet(new_record_nulls, false, sizeof(new_record_nulls));
     552             : 
     553         958 :     new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
     554         958 :     new_record[Anum_pg_database_datname - 1] =
     555         958 :         DirectFunctionCall1(namein, CStringGetDatum(dbname));
     556         958 :     new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
     557         958 :     new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
     558         958 :     new_record[Anum_pg_database_datcollate - 1] =
     559         958 :         DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
     560         958 :     new_record[Anum_pg_database_datctype - 1] =
     561         958 :         DirectFunctionCall1(namein, CStringGetDatum(dbctype));
     562         958 :     new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
     563         958 :     new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
     564         958 :     new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
     565         958 :     new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
     566         958 :     new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
     567         958 :     new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
     568         958 :     new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
     569             : 
     570             :     /*
     571             :      * We deliberately set datacl to default (NULL), rather than copying it
     572             :      * from the template database.  Copying it would be a bad idea when the
     573             :      * owner is not the same as the template's owner.
     574             :      */
     575         958 :     new_record_nulls[Anum_pg_database_datacl - 1] = true;
     576             : 
     577         958 :     tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
     578             :                             new_record, new_record_nulls);
     579             : 
     580         958 :     CatalogTupleInsert(pg_database_rel, tuple);
     581             : 
     582             :     /*
     583             :      * Now generate additional catalog entries associated with the new DB
     584             :      */
     585             : 
     586             :     /* Register owner dependency */
     587         958 :     recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
     588             : 
     589             :     /* Create pg_shdepend entries for objects within database */
     590         958 :     copyTemplateDependencies(src_dboid, dboid);
     591             : 
     592             :     /* Post creation hook for new database */
     593         958 :     InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
     594             : 
     595             :     /*
     596             :      * Force a checkpoint before starting the copy. This will force all dirty
     597             :      * buffers, including those of unlogged tables, out to disk, to ensure
     598             :      * source database is up-to-date on disk for the copy.
     599             :      * FlushDatabaseBuffers() would suffice for that, but we also want to
     600             :      * process any pending unlink requests. Otherwise, if a checkpoint
     601             :      * happened while we're copying files, a file might be deleted just when
     602             :      * we're about to copy it, causing the lstat() call in copydir() to fail
     603             :      * with ENOENT.
     604             :      */
     605         958 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
     606             :                       | CHECKPOINT_FLUSH_ALL);
     607             : 
     608             :     /*
     609             :      * Once we start copying subdirectories, we need to be able to clean 'em
     610             :      * up if we fail.  Use an ENSURE block to make sure this happens.  (This
     611             :      * is not a 100% solution, because of the possibility of failure during
     612             :      * transaction commit after we leave this routine, but it should handle
     613             :      * most scenarios.)
     614             :      */
     615         958 :     fparms.src_dboid = src_dboid;
     616         958 :     fparms.dest_dboid = dboid;
     617         958 :     PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
     618             :                             PointerGetDatum(&fparms));
     619             :     {
     620             :         /*
     621             :          * Iterate through all tablespaces of the template database, and copy
     622             :          * each one to the new database.
     623             :          */
     624         958 :         rel = table_open(TableSpaceRelationId, AccessShareLock);
     625         958 :         scan = table_beginscan_catalog(rel, 0, NULL);
     626        2874 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     627             :         {
     628        1916 :             Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
     629        1916 :             Oid         srctablespace = spaceform->oid;
     630             :             Oid         dsttablespace;
     631             :             char       *srcpath;
     632             :             char       *dstpath;
     633             :             struct stat st;
     634             : 
     635             :             /* No need to copy global tablespace */
     636        1916 :             if (srctablespace == GLOBALTABLESPACE_OID)
     637         958 :                 continue;
     638             : 
     639         958 :             srcpath = GetDatabasePath(src_dboid, srctablespace);
     640             : 
     641         958 :             if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
     642         958 :                 directory_is_empty(srcpath))
     643             :             {
     644             :                 /* Assume we can ignore it */
     645           0 :                 pfree(srcpath);
     646           0 :                 continue;
     647             :             }
     648             : 
     649         958 :             if (srctablespace == src_deftablespace)
     650         958 :                 dsttablespace = dst_deftablespace;
     651             :             else
     652           0 :                 dsttablespace = srctablespace;
     653             : 
     654         958 :             dstpath = GetDatabasePath(dboid, dsttablespace);
     655             : 
     656             :             /*
     657             :              * Copy this subdirectory to the new location
     658             :              *
     659             :              * We don't need to copy subdirectories
     660             :              */
     661         958 :             copydir(srcpath, dstpath, false);
     662             : 
     663             :             /* Record the filesystem change in XLOG */
     664             :             {
     665             :                 xl_dbase_create_rec xlrec;
     666             : 
     667         958 :                 xlrec.db_id = dboid;
     668         958 :                 xlrec.tablespace_id = dsttablespace;
     669         958 :                 xlrec.src_db_id = src_dboid;
     670         958 :                 xlrec.src_tablespace_id = srctablespace;
     671             : 
     672         958 :                 XLogBeginInsert();
     673         958 :                 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
     674             : 
     675         958 :                 (void) XLogInsert(RM_DBASE_ID,
     676             :                                   XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
     677             :             }
     678             :         }
     679         958 :         table_endscan(scan);
     680         958 :         table_close(rel, AccessShareLock);
     681             : 
     682             :         /*
     683             :          * We force a checkpoint before committing.  This effectively means
     684             :          * that committed XLOG_DBASE_CREATE operations will never need to be
     685             :          * replayed (at least not in ordinary crash recovery; we still have to
     686             :          * make the XLOG entry for the benefit of PITR operations). This
     687             :          * avoids two nasty scenarios:
     688             :          *
     689             :          * #1: When PITR is off, we don't XLOG the contents of newly created
     690             :          * indexes; therefore the drop-and-recreate-whole-directory behavior
     691             :          * of DBASE_CREATE replay would lose such indexes.
     692             :          *
     693             :          * #2: Since we have to recopy the source database during DBASE_CREATE
     694             :          * replay, we run the risk of copying changes in it that were
     695             :          * committed after the original CREATE DATABASE command but before the
     696             :          * system crash that led to the replay.  This is at least unexpected
     697             :          * and at worst could lead to inconsistencies, eg duplicate table
     698             :          * names.
     699             :          *
     700             :          * (Both of these were real bugs in releases 8.0 through 8.0.3.)
     701             :          *
     702             :          * In PITR replay, the first of these isn't an issue, and the second
     703             :          * is only a risk if the CREATE DATABASE and subsequent template
     704             :          * database change both occur while a base backup is being taken.
     705             :          * There doesn't seem to be much we can do about that except document
     706             :          * it as a limitation.
     707             :          *
     708             :          * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
     709             :          * we can avoid this.
     710             :          */
     711         958 :         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
     712             : 
     713             :         /*
     714             :          * Close pg_database, but keep lock till commit.
     715             :          */
     716         958 :         table_close(pg_database_rel, NoLock);
     717             : 
     718             :         /*
     719             :          * Force synchronous commit, thus minimizing the window between
     720             :          * creation of the database files and committal of the transaction. If
     721             :          * we crash before committing, we'll have a DB that's taking up disk
     722             :          * space but is not in pg_database, which is not good.
     723             :          */
     724         958 :         ForceSyncCommit();
     725             :     }
     726        1916 :     PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
     727             :                                 PointerGetDatum(&fparms));
     728             : 
     729         958 :     return dboid;
     730             : }
     731             : 
     732             : /*
     733             :  * Check whether chosen encoding matches chosen locale settings.  This
     734             :  * restriction is necessary because libc's locale-specific code usually
     735             :  * fails when presented with data in an encoding it's not expecting. We
     736             :  * allow mismatch in four cases:
     737             :  *
     738             :  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
     739             :  * which works with any encoding.
     740             :  *
     741             :  * 2. locale encoding = -1, which means that we couldn't determine the
     742             :  * locale's encoding and have to trust the user to get it right.
     743             :  *
     744             :  * 3. selected encoding is UTF8 and platform is win32. This is because
     745             :  * UTF8 is a pseudo codepage that is supported in all locales since it's
     746             :  * converted to UTF16 before being used.
     747             :  *
     748             :  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
     749             :  * is risky but we have historically allowed it --- notably, the
     750             :  * regression tests require it.
     751             :  *
     752             :  * Note: if you change this policy, fix initdb to match.
     753             :  */
     754             : void
     755         964 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
     756             : {
     757         964 :     int         ctype_encoding = pg_get_encoding_from_locale(ctype, true);
     758         964 :     int         collate_encoding = pg_get_encoding_from_locale(collate, true);
     759             : 
     760         972 :     if (!(ctype_encoding == encoding ||
     761           8 :           ctype_encoding == PG_SQL_ASCII ||
     762             :           ctype_encoding == -1 ||
     763             : #ifdef WIN32
     764             :           encoding == PG_UTF8 ||
     765             : #endif
     766           8 :           (encoding == PG_SQL_ASCII && superuser())))
     767           0 :         ereport(ERROR,
     768             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     769             :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
     770             :                         pg_encoding_to_char(encoding),
     771             :                         ctype),
     772             :                  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
     773             :                            pg_encoding_to_char(ctype_encoding))));
     774             : 
     775         972 :     if (!(collate_encoding == encoding ||
     776           8 :           collate_encoding == PG_SQL_ASCII ||
     777             :           collate_encoding == -1 ||
     778             : #ifdef WIN32
     779             :           encoding == PG_UTF8 ||
     780             : #endif
     781           8 :           (encoding == PG_SQL_ASCII && superuser())))
     782           0 :         ereport(ERROR,
     783             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     784             :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
     785             :                         pg_encoding_to_char(encoding),
     786             :                         collate),
     787             :                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
     788             :                            pg_encoding_to_char(collate_encoding))));
     789         964 : }
     790             : 
     791             : /* Error cleanup callback for createdb */
     792             : static void
     793           0 : createdb_failure_callback(int code, Datum arg)
     794             : {
     795           0 :     createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
     796             : 
     797             :     /*
     798             :      * Release lock on source database before doing recursive remove. This is
     799             :      * not essential but it seems desirable to release the lock as soon as
     800             :      * possible.
     801             :      */
     802           0 :     UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
     803             : 
     804             :     /* Throw away any successfully copied subdirectories */
     805           0 :     remove_dbtablespaces(fparms->dest_dboid);
     806           0 : }
     807             : 
     808             : 
     809             : /*
     810             :  * DROP DATABASE
     811             :  */
     812             : void
     813          40 : dropdb(const char *dbname, bool missing_ok, bool force)
     814             : {
     815             :     Oid         db_id;
     816             :     bool        db_istemplate;
     817             :     Relation    pgdbrel;
     818             :     HeapTuple   tup;
     819             :     int         notherbackends;
     820             :     int         npreparedxacts;
     821             :     int         nslots,
     822             :                 nslots_active;
     823             :     int         nsubscriptions;
     824             : 
     825             :     /*
     826             :      * Look up the target database's OID, and get exclusive lock on it. We
     827             :      * need this to ensure that no new backend starts up in the target
     828             :      * database while we are deleting it (see postinit.c), and that no one is
     829             :      * using it as a CREATE DATABASE template or trying to delete it for
     830             :      * themselves.
     831             :      */
     832          40 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
     833             : 
     834          40 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
     835             :                      &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
     836             :     {
     837          20 :         if (!missing_ok)
     838             :         {
     839          10 :             ereport(ERROR,
     840             :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
     841             :                      errmsg("database \"%s\" does not exist", dbname)));
     842             :         }
     843             :         else
     844             :         {
     845             :             /* Close pg_database, release the lock, since we changed nothing */
     846          10 :             table_close(pgdbrel, RowExclusiveLock);
     847          10 :             ereport(NOTICE,
     848             :                     (errmsg("database \"%s\" does not exist, skipping",
     849             :                             dbname)));
     850          10 :             return;
     851             :         }
     852             :     }
     853             : 
     854             :     /*
     855             :      * Permission checks
     856             :      */
     857          20 :     if (!pg_database_ownercheck(db_id, GetUserId()))
     858           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
     859             :                        dbname);
     860             : 
     861             :     /* DROP hook for the database being removed */
     862          20 :     InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
     863             : 
     864             :     /*
     865             :      * Disallow dropping a DB that is marked istemplate.  This is just to
     866             :      * prevent people from accidentally dropping template0 or template1; they
     867             :      * can do so if they're really determined ...
     868             :      */
     869          20 :     if (db_istemplate)
     870           0 :         ereport(ERROR,
     871             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     872             :                  errmsg("cannot drop a template database")));
     873             : 
     874             :     /* Obviously can't drop my own database */
     875          20 :     if (db_id == MyDatabaseId)
     876           0 :         ereport(ERROR,
     877             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     878             :                  errmsg("cannot drop the currently open database")));
     879             : 
     880             :     /*
     881             :      * Check whether there are active logical slots that refer to the
     882             :      * to-be-dropped database. The database lock we are holding prevents the
     883             :      * creation of new slots using the database or existing slots becoming
     884             :      * active.
     885             :      */
     886          20 :     (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
     887          20 :     if (nslots_active)
     888             :     {
     889           2 :         ereport(ERROR,
     890             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     891             :                  errmsg("database \"%s\" is used by an active logical replication slot",
     892             :                         dbname),
     893             :                  errdetail_plural("There is %d active slot.",
     894             :                                   "There are %d active slots.",
     895             :                                   nslots_active, nslots_active)));
     896             :     }
     897             : 
     898             :     /*
     899             :      * Check if there are subscriptions defined in the target database.
     900             :      *
     901             :      * We can't drop them automatically because they might be holding
     902             :      * resources in other databases/instances.
     903             :      */
     904          18 :     if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
     905           0 :         ereport(ERROR,
     906             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     907             :                  errmsg("database \"%s\" is being used by logical replication subscription",
     908             :                         dbname),
     909             :                  errdetail_plural("There is %d subscription.",
     910             :                                   "There are %d subscriptions.",
     911             :                                   nsubscriptions, nsubscriptions)));
     912             : 
     913             : 
     914             :     /*
     915             :      * Attempt to terminate all existing connections to the target database if
     916             :      * the user has requested to do so.
     917             :      */
     918          18 :     if (force)
     919           2 :         TerminateOtherDBBackends(db_id);
     920             : 
     921             :     /*
     922             :      * Check for other backends in the target database.  (Because we hold the
     923             :      * database lock, no new ones can start after this.)
     924             :      *
     925             :      * As in CREATE DATABASE, check this after other error conditions.
     926             :      */
     927          18 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
     928           0 :         ereport(ERROR,
     929             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     930             :                  errmsg("database \"%s\" is being accessed by other users",
     931             :                         dbname),
     932             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
     933             : 
     934             :     /*
     935             :      * Remove the database's tuple from pg_database.
     936             :      */
     937          18 :     tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
     938          18 :     if (!HeapTupleIsValid(tup))
     939           0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
     940             : 
     941          18 :     CatalogTupleDelete(pgdbrel, &tup->t_self);
     942             : 
     943          18 :     ReleaseSysCache(tup);
     944             : 
     945             :     /*
     946             :      * Delete any comments or security labels associated with the database.
     947             :      */
     948          18 :     DeleteSharedComments(db_id, DatabaseRelationId);
     949          18 :     DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
     950             : 
     951             :     /*
     952             :      * Remove settings associated with this database
     953             :      */
     954          18 :     DropSetting(db_id, InvalidOid);
     955             : 
     956             :     /*
     957             :      * Remove shared dependency references for the database.
     958             :      */
     959          18 :     dropDatabaseDependencies(db_id);
     960             : 
     961             :     /*
     962             :      * Drop db-specific replication slots.
     963             :      */
     964          18 :     ReplicationSlotsDropDBSlots(db_id);
     965             : 
     966             :     /*
     967             :      * Drop pages for this database that are in the shared buffer cache. This
     968             :      * is important to ensure that no remaining backend tries to write out a
     969             :      * dirty buffer to the dead database later...
     970             :      */
     971          18 :     DropDatabaseBuffers(db_id);
     972             : 
     973             :     /*
     974             :      * Tell the stats collector to forget it immediately, too.
     975             :      */
     976          18 :     pgstat_drop_database(db_id);
     977             : 
     978             :     /*
     979             :      * Tell checkpointer to forget any pending fsync and unlink requests for
     980             :      * files in the database; else the fsyncs will fail at next checkpoint, or
     981             :      * worse, it will delete files that belong to a newly created database
     982             :      * with the same OID.
     983             :      */
     984          18 :     ForgetDatabaseSyncRequests(db_id);
     985             : 
     986             :     /*
     987             :      * Force a checkpoint to make sure the checkpointer has received the
     988             :      * message sent by ForgetDatabaseSyncRequests. On Windows, this also
     989             :      * ensures that background procs don't hold any open files, which would
     990             :      * cause rmdir() to fail.
     991             :      */
     992          18 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
     993             : 
     994             :     /*
     995             :      * Remove all tablespace subdirs belonging to the database.
     996             :      */
     997          18 :     remove_dbtablespaces(db_id);
     998             : 
     999             :     /*
    1000             :      * Close pg_database, but keep lock till commit.
    1001             :      */
    1002          18 :     table_close(pgdbrel, NoLock);
    1003             : 
    1004             :     /*
    1005             :      * Force synchronous commit, thus minimizing the window between removal of
    1006             :      * the database files and committal of the transaction. If we crash before
    1007             :      * committing, we'll have a DB that's gone on disk but still there
    1008             :      * according to pg_database, which is not good.
    1009             :      */
    1010          18 :     ForceSyncCommit();
    1011             : }
    1012             : 
    1013             : 
    1014             : /*
    1015             :  * Rename database
    1016             :  */
    1017             : ObjectAddress
    1018           0 : RenameDatabase(const char *oldname, const char *newname)
    1019             : {
    1020             :     Oid         db_id;
    1021             :     HeapTuple   newtup;
    1022             :     Relation    rel;
    1023             :     int         notherbackends;
    1024             :     int         npreparedxacts;
    1025             :     ObjectAddress address;
    1026             : 
    1027             :     /*
    1028             :      * Look up the target database's OID, and get exclusive lock on it. We
    1029             :      * need this for the same reasons as DROP DATABASE.
    1030             :      */
    1031           0 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1032             : 
    1033           0 :     if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
    1034             :                      NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1035           0 :         ereport(ERROR,
    1036             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1037             :                  errmsg("database \"%s\" does not exist", oldname)));
    1038             : 
    1039             :     /* must be owner */
    1040           0 :     if (!pg_database_ownercheck(db_id, GetUserId()))
    1041           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1042             :                        oldname);
    1043             : 
    1044             :     /* must have createdb rights */
    1045           0 :     if (!have_createdb_privilege())
    1046           0 :         ereport(ERROR,
    1047             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1048             :                  errmsg("permission denied to rename database")));
    1049             : 
    1050             :     /*
    1051             :      * If built with appropriate switch, whine when regression-testing
    1052             :      * conventions for database names are violated.
    1053             :      */
    1054             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1055             :     if (strstr(newname, "regression") == NULL)
    1056             :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1057             : #endif
    1058             : 
    1059             :     /*
    1060             :      * Make sure the new name doesn't exist.  See notes for same error in
    1061             :      * CREATE DATABASE.
    1062             :      */
    1063           0 :     if (OidIsValid(get_database_oid(newname, true)))
    1064           0 :         ereport(ERROR,
    1065             :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1066             :                  errmsg("database \"%s\" already exists", newname)));
    1067             : 
    1068             :     /*
    1069             :      * XXX Client applications probably store the current database somewhere,
    1070             :      * so renaming it could cause confusion.  On the other hand, there may not
    1071             :      * be an actual problem besides a little confusion, so think about this
    1072             :      * and decide.
    1073             :      */
    1074           0 :     if (db_id == MyDatabaseId)
    1075           0 :         ereport(ERROR,
    1076             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1077             :                  errmsg("current database cannot be renamed")));
    1078             : 
    1079             :     /*
    1080             :      * Make sure the database does not have active sessions.  This is the same
    1081             :      * concern as above, but applied to other sessions.
    1082             :      *
    1083             :      * As in CREATE DATABASE, check this after other error conditions.
    1084             :      */
    1085           0 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1086           0 :         ereport(ERROR,
    1087             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1088             :                  errmsg("database \"%s\" is being accessed by other users",
    1089             :                         oldname),
    1090             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1091             : 
    1092             :     /* rename */
    1093           0 :     newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    1094           0 :     if (!HeapTupleIsValid(newtup))
    1095           0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1096           0 :     namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
    1097           0 :     CatalogTupleUpdate(rel, &newtup->t_self, newtup);
    1098             : 
    1099           0 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    1100             : 
    1101           0 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    1102             : 
    1103             :     /*
    1104             :      * Close pg_database, but keep lock till commit.
    1105             :      */
    1106           0 :     table_close(rel, NoLock);
    1107             : 
    1108           0 :     return address;
    1109             : }
    1110             : 
    1111             : 
    1112             : /*
    1113             :  * ALTER DATABASE SET TABLESPACE
    1114             :  */
    1115             : static void
    1116           0 : movedb(const char *dbname, const char *tblspcname)
    1117             : {
    1118             :     Oid         db_id;
    1119             :     Relation    pgdbrel;
    1120             :     int         notherbackends;
    1121             :     int         npreparedxacts;
    1122             :     HeapTuple   oldtuple,
    1123             :                 newtuple;
    1124             :     Oid         src_tblspcoid,
    1125             :                 dst_tblspcoid;
    1126             :     Datum       new_record[Natts_pg_database];
    1127             :     bool        new_record_nulls[Natts_pg_database];
    1128             :     bool        new_record_repl[Natts_pg_database];
    1129             :     ScanKeyData scankey;
    1130             :     SysScanDesc sysscan;
    1131             :     AclResult   aclresult;
    1132             :     char       *src_dbpath;
    1133             :     char       *dst_dbpath;
    1134             :     DIR        *dstdir;
    1135             :     struct dirent *xlde;
    1136             :     movedb_failure_params fparms;
    1137             : 
    1138             :     /*
    1139             :      * Look up the target database's OID, and get exclusive lock on it. We
    1140             :      * need this to ensure that no new backend starts up in the database while
    1141             :      * we are moving it, and that no one is using it as a CREATE DATABASE
    1142             :      * template or trying to delete it.
    1143             :      */
    1144           0 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1145             : 
    1146           0 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1147             :                      NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
    1148           0 :         ereport(ERROR,
    1149             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1150             :                  errmsg("database \"%s\" does not exist", dbname)));
    1151             : 
    1152             :     /*
    1153             :      * We actually need a session lock, so that the lock will persist across
    1154             :      * the commit/restart below.  (We could almost get away with letting the
    1155             :      * lock be released at commit, except that someone could try to move
    1156             :      * relations of the DB back into the old directory while we rmtree() it.)
    1157             :      */
    1158           0 :     LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    1159             :                                AccessExclusiveLock);
    1160             : 
    1161             :     /*
    1162             :      * Permission checks
    1163             :      */
    1164           0 :     if (!pg_database_ownercheck(db_id, GetUserId()))
    1165           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1166             :                        dbname);
    1167             : 
    1168             :     /*
    1169             :      * Obviously can't move the tables of my own database
    1170             :      */
    1171           0 :     if (db_id == MyDatabaseId)
    1172           0 :         ereport(ERROR,
    1173             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1174             :                  errmsg("cannot change the tablespace of the currently open database")));
    1175             : 
    1176             :     /*
    1177             :      * Get tablespace's oid
    1178             :      */
    1179           0 :     dst_tblspcoid = get_tablespace_oid(tblspcname, false);
    1180             : 
    1181             :     /*
    1182             :      * Permission checks
    1183             :      */
    1184           0 :     aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
    1185             :                                        ACL_CREATE);
    1186           0 :     if (aclresult != ACLCHECK_OK)
    1187           0 :         aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1188             :                        tblspcname);
    1189             : 
    1190             :     /*
    1191             :      * pg_global must never be the default tablespace
    1192             :      */
    1193           0 :     if (dst_tblspcoid == GLOBALTABLESPACE_OID)
    1194           0 :         ereport(ERROR,
    1195             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1196             :                  errmsg("pg_global cannot be used as default tablespace")));
    1197             : 
    1198             :     /*
    1199             :      * No-op if same tablespace
    1200             :      */
    1201           0 :     if (src_tblspcoid == dst_tblspcoid)
    1202             :     {
    1203           0 :         table_close(pgdbrel, NoLock);
    1204           0 :         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    1205             :                                      AccessExclusiveLock);
    1206           0 :         return;
    1207             :     }
    1208             : 
    1209             :     /*
    1210             :      * Check for other backends in the target database.  (Because we hold the
    1211             :      * database lock, no new ones can start after this.)
    1212             :      *
    1213             :      * As in CREATE DATABASE, check this after other error conditions.
    1214             :      */
    1215           0 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1216           0 :         ereport(ERROR,
    1217             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1218             :                  errmsg("database \"%s\" is being accessed by other users",
    1219             :                         dbname),
    1220             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1221             : 
    1222             :     /*
    1223             :      * Get old and new database paths
    1224             :      */
    1225           0 :     src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
    1226           0 :     dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
    1227             : 
    1228             :     /*
    1229             :      * Force a checkpoint before proceeding. This will force all dirty
    1230             :      * buffers, including those of unlogged tables, out to disk, to ensure
    1231             :      * source database is up-to-date on disk for the copy.
    1232             :      * FlushDatabaseBuffers() would suffice for that, but we also want to
    1233             :      * process any pending unlink requests. Otherwise, the check for existing
    1234             :      * files in the target directory might fail unnecessarily, not to mention
    1235             :      * that the copy might fail due to source files getting deleted under it.
    1236             :      * On Windows, this also ensures that background procs don't hold any open
    1237             :      * files, which would cause rmdir() to fail.
    1238             :      */
    1239           0 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
    1240             :                       | CHECKPOINT_FLUSH_ALL);
    1241             : 
    1242             :     /*
    1243             :      * Now drop all buffers holding data of the target database; they should
    1244             :      * no longer be dirty so DropDatabaseBuffers is safe.
    1245             :      *
    1246             :      * It might seem that we could just let these buffers age out of shared
    1247             :      * buffers naturally, since they should not get referenced anymore.  The
    1248             :      * problem with that is that if the user later moves the database back to
    1249             :      * its original tablespace, any still-surviving buffers would appear to
    1250             :      * contain valid data again --- but they'd be missing any changes made in
    1251             :      * the database while it was in the new tablespace.  In any case, freeing
    1252             :      * buffers that should never be used again seems worth the cycles.
    1253             :      *
    1254             :      * Note: it'd be sufficient to get rid of buffers matching db_id and
    1255             :      * src_tblspcoid, but bufmgr.c presently provides no API for that.
    1256             :      */
    1257           0 :     DropDatabaseBuffers(db_id);
    1258             : 
    1259             :     /*
    1260             :      * Check for existence of files in the target directory, i.e., objects of
    1261             :      * this database that are already in the target tablespace.  We can't
    1262             :      * allow the move in such a case, because we would need to change those
    1263             :      * relations' pg_class.reltablespace entries to zero, and we don't have
    1264             :      * access to the DB's pg_class to do so.
    1265             :      */
    1266           0 :     dstdir = AllocateDir(dst_dbpath);
    1267           0 :     if (dstdir != NULL)
    1268             :     {
    1269           0 :         while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
    1270             :         {
    1271           0 :             if (strcmp(xlde->d_name, ".") == 0 ||
    1272           0 :                 strcmp(xlde->d_name, "..") == 0)
    1273           0 :                 continue;
    1274             : 
    1275           0 :             ereport(ERROR,
    1276             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1277             :                      errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
    1278             :                             dbname, tblspcname),
    1279             :                      errhint("You must move them back to the database's default tablespace before using this command.")));
    1280             :         }
    1281             : 
    1282           0 :         FreeDir(dstdir);
    1283             : 
    1284             :         /*
    1285             :          * The directory exists but is empty. We must remove it before using
    1286             :          * the copydir function.
    1287             :          */
    1288           0 :         if (rmdir(dst_dbpath) != 0)
    1289           0 :             elog(ERROR, "could not remove directory \"%s\": %m",
    1290             :                  dst_dbpath);
    1291             :     }
    1292             : 
    1293             :     /*
    1294             :      * Use an ENSURE block to make sure we remove the debris if the copy fails
    1295             :      * (eg, due to out-of-disk-space).  This is not a 100% solution, because
    1296             :      * of the possibility of failure during transaction commit, but it should
    1297             :      * handle most scenarios.
    1298             :      */
    1299           0 :     fparms.dest_dboid = db_id;
    1300           0 :     fparms.dest_tsoid = dst_tblspcoid;
    1301           0 :     PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    1302             :                             PointerGetDatum(&fparms));
    1303             :     {
    1304             :         /*
    1305             :          * Copy files from the old tablespace to the new one
    1306             :          */
    1307           0 :         copydir(src_dbpath, dst_dbpath, false);
    1308             : 
    1309             :         /*
    1310             :          * Record the filesystem change in XLOG
    1311             :          */
    1312             :         {
    1313             :             xl_dbase_create_rec xlrec;
    1314             : 
    1315           0 :             xlrec.db_id = db_id;
    1316           0 :             xlrec.tablespace_id = dst_tblspcoid;
    1317           0 :             xlrec.src_db_id = db_id;
    1318           0 :             xlrec.src_tablespace_id = src_tblspcoid;
    1319             : 
    1320           0 :             XLogBeginInsert();
    1321           0 :             XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
    1322             : 
    1323           0 :             (void) XLogInsert(RM_DBASE_ID,
    1324             :                               XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
    1325             :         }
    1326             : 
    1327             :         /*
    1328             :          * Update the database's pg_database tuple
    1329             :          */
    1330           0 :         ScanKeyInit(&scankey,
    1331             :                     Anum_pg_database_datname,
    1332             :                     BTEqualStrategyNumber, F_NAMEEQ,
    1333             :                     CStringGetDatum(dbname));
    1334           0 :         sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
    1335             :                                      NULL, 1, &scankey);
    1336           0 :         oldtuple = systable_getnext(sysscan);
    1337           0 :         if (!HeapTupleIsValid(oldtuple))    /* shouldn't happen... */
    1338           0 :             ereport(ERROR,
    1339             :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    1340             :                      errmsg("database \"%s\" does not exist", dbname)));
    1341             : 
    1342           0 :         MemSet(new_record, 0, sizeof(new_record));
    1343           0 :         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
    1344           0 :         MemSet(new_record_repl, false, sizeof(new_record_repl));
    1345             : 
    1346           0 :         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
    1347           0 :         new_record_repl[Anum_pg_database_dattablespace - 1] = true;
    1348             : 
    1349           0 :         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
    1350             :                                      new_record,
    1351             :                                      new_record_nulls, new_record_repl);
    1352           0 :         CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
    1353             : 
    1354           0 :         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    1355             : 
    1356           0 :         systable_endscan(sysscan);
    1357             : 
    1358             :         /*
    1359             :          * Force another checkpoint here.  As in CREATE DATABASE, this is to
    1360             :          * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
    1361             :          * operation, which would cause us to lose any unlogged operations
    1362             :          * done in the new DB tablespace before the next checkpoint.
    1363             :          */
    1364           0 :         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    1365             : 
    1366             :         /*
    1367             :          * Force synchronous commit, thus minimizing the window between
    1368             :          * copying the database files and committal of the transaction. If we
    1369             :          * crash before committing, we'll leave an orphaned set of files on
    1370             :          * disk, which is not fatal but not good either.
    1371             :          */
    1372           0 :         ForceSyncCommit();
    1373             : 
    1374             :         /*
    1375             :          * Close pg_database, but keep lock till commit.
    1376             :          */
    1377           0 :         table_close(pgdbrel, NoLock);
    1378             :     }
    1379           0 :     PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    1380             :                                 PointerGetDatum(&fparms));
    1381             : 
    1382             :     /*
    1383             :      * Commit the transaction so that the pg_database update is committed. If
    1384             :      * we crash while removing files, the database won't be corrupt, we'll
    1385             :      * just leave some orphaned files in the old directory.
    1386             :      *
    1387             :      * (This is OK because we know we aren't inside a transaction block.)
    1388             :      *
    1389             :      * XXX would it be safe/better to do this inside the ensure block?  Not
    1390             :      * convinced it's a good idea; consider elog just after the transaction
    1391             :      * really commits.
    1392             :      */
    1393           0 :     PopActiveSnapshot();
    1394           0 :     CommitTransactionCommand();
    1395             : 
    1396             :     /* Start new transaction for the remaining work; don't need a snapshot */
    1397           0 :     StartTransactionCommand();
    1398             : 
    1399             :     /*
    1400             :      * Remove files from the old tablespace
    1401             :      */
    1402           0 :     if (!rmtree(src_dbpath, true))
    1403           0 :         ereport(WARNING,
    1404             :                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
    1405             :                         src_dbpath)));
    1406             : 
    1407             :     /*
    1408             :      * Record the filesystem change in XLOG
    1409             :      */
    1410             :     {
    1411             :         xl_dbase_drop_rec xlrec;
    1412             : 
    1413           0 :         xlrec.db_id = db_id;
    1414           0 :         xlrec.ntablespaces = 1;
    1415             : 
    1416           0 :         XLogBeginInsert();
    1417           0 :         XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
    1418           0 :         XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
    1419             : 
    1420           0 :         (void) XLogInsert(RM_DBASE_ID,
    1421             :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    1422             :     }
    1423             : 
    1424             :     /* Now it's safe to release the database lock */
    1425           0 :     UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    1426             :                                  AccessExclusiveLock);
    1427             : }
    1428             : 
    1429             : /* Error cleanup callback for movedb */
    1430             : static void
    1431           0 : movedb_failure_callback(int code, Datum arg)
    1432             : {
    1433           0 :     movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
    1434             :     char       *dstpath;
    1435             : 
    1436             :     /* Get rid of anything we managed to copy to the target directory */
    1437           0 :     dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
    1438             : 
    1439           0 :     (void) rmtree(dstpath, true);
    1440           0 : }
    1441             : 
    1442             : /*
    1443             :  * Process options and call dropdb function.
    1444             :  */
    1445             : void
    1446          40 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
    1447             : {
    1448          40 :     bool        force = false;
    1449             :     ListCell   *lc;
    1450             : 
    1451          58 :     foreach(lc, stmt->options)
    1452             :     {
    1453          18 :         DefElem    *opt = (DefElem *) lfirst(lc);
    1454             : 
    1455          18 :         if (strcmp(opt->defname, "force") == 0)
    1456          18 :             force = true;
    1457             :         else
    1458           0 :             ereport(ERROR,
    1459             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1460             :                      errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
    1461             :                      parser_errposition(pstate, opt->location)));
    1462             :     }
    1463             : 
    1464          40 :     dropdb(stmt->dbname, stmt->missing_ok, force);
    1465          28 : }
    1466             : 
    1467             : /*
    1468             :  * ALTER DATABASE name ...
    1469             :  */
    1470             : Oid
    1471           6 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    1472             : {
    1473             :     Relation    rel;
    1474             :     Oid         dboid;
    1475             :     HeapTuple   tuple,
    1476             :                 newtuple;
    1477             :     Form_pg_database datform;
    1478             :     ScanKeyData scankey;
    1479             :     SysScanDesc scan;
    1480             :     ListCell   *option;
    1481           6 :     bool        dbistemplate = false;
    1482           6 :     bool        dballowconnections = true;
    1483           6 :     int         dbconnlimit = -1;
    1484           6 :     DefElem    *distemplate = NULL;
    1485           6 :     DefElem    *dallowconnections = NULL;
    1486           6 :     DefElem    *dconnlimit = NULL;
    1487           6 :     DefElem    *dtablespace = NULL;
    1488             :     Datum       new_record[Natts_pg_database];
    1489             :     bool        new_record_nulls[Natts_pg_database];
    1490             :     bool        new_record_repl[Natts_pg_database];
    1491             : 
    1492             :     /* Extract options from the statement node tree */
    1493          12 :     foreach(option, stmt->options)
    1494             :     {
    1495           6 :         DefElem    *defel = (DefElem *) lfirst(option);
    1496             : 
    1497           6 :         if (strcmp(defel->defname, "is_template") == 0)
    1498             :         {
    1499           2 :             if (distemplate)
    1500           0 :                 ereport(ERROR,
    1501             :                         (errcode(ERRCODE_SYNTAX_ERROR),
    1502             :                          errmsg("conflicting or redundant options"),
    1503             :                          parser_errposition(pstate, defel->location)));
    1504           2 :             distemplate = defel;
    1505             :         }
    1506           4 :         else if (strcmp(defel->defname, "allow_connections") == 0)
    1507             :         {
    1508           4 :             if (dallowconnections)
    1509           0 :                 ereport(ERROR,
    1510             :                         (errcode(ERRCODE_SYNTAX_ERROR),
    1511             :                          errmsg("conflicting or redundant options"),
    1512             :                          parser_errposition(pstate, defel->location)));
    1513           4 :             dallowconnections = defel;
    1514             :         }
    1515           0 :         else if (strcmp(defel->defname, "connection_limit") == 0)
    1516             :         {
    1517           0 :             if (dconnlimit)
    1518           0 :                 ereport(ERROR,
    1519             :                         (errcode(ERRCODE_SYNTAX_ERROR),
    1520             :                          errmsg("conflicting or redundant options"),
    1521             :                          parser_errposition(pstate, defel->location)));
    1522           0 :             dconnlimit = defel;
    1523             :         }
    1524           0 :         else if (strcmp(defel->defname, "tablespace") == 0)
    1525             :         {
    1526           0 :             if (dtablespace)
    1527           0 :                 ereport(ERROR,
    1528             :                         (errcode(ERRCODE_SYNTAX_ERROR),
    1529             :                          errmsg("conflicting or redundant options"),
    1530             :                          parser_errposition(pstate, defel->location)));
    1531           0 :             dtablespace = defel;
    1532             :         }
    1533             :         else
    1534           0 :             ereport(ERROR,
    1535             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1536             :                      errmsg("option \"%s\" not recognized", defel->defname),
    1537             :                      parser_errposition(pstate, defel->location)));
    1538             :     }
    1539             : 
    1540           6 :     if (dtablespace)
    1541             :     {
    1542             :         /*
    1543             :          * While the SET TABLESPACE syntax doesn't allow any other options,
    1544             :          * somebody could write "WITH TABLESPACE ...".  Forbid any other
    1545             :          * options from being specified in that case.
    1546             :          */
    1547           0 :         if (list_length(stmt->options) != 1)
    1548           0 :             ereport(ERROR,
    1549             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1550             :                      errmsg("option \"%s\" cannot be specified with other options",
    1551             :                             dtablespace->defname),
    1552             :                      parser_errposition(pstate, dtablespace->location)));
    1553             :         /* this case isn't allowed within a transaction block */
    1554           0 :         PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
    1555           0 :         movedb(stmt->dbname, defGetString(dtablespace));
    1556           0 :         return InvalidOid;
    1557             :     }
    1558             : 
    1559           6 :     if (distemplate && distemplate->arg)
    1560           2 :         dbistemplate = defGetBoolean(distemplate);
    1561           6 :     if (dallowconnections && dallowconnections->arg)
    1562           4 :         dballowconnections = defGetBoolean(dallowconnections);
    1563           6 :     if (dconnlimit && dconnlimit->arg)
    1564             :     {
    1565           0 :         dbconnlimit = defGetInt32(dconnlimit);
    1566           0 :         if (dbconnlimit < -1)
    1567           0 :             ereport(ERROR,
    1568             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1569             :                      errmsg("invalid connection limit: %d", dbconnlimit)));
    1570             :     }
    1571             : 
    1572             :     /*
    1573             :      * Get the old tuple.  We don't need a lock on the database per se,
    1574             :      * because we're not going to do anything that would mess up incoming
    1575             :      * connections.
    1576             :      */
    1577           6 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1578           6 :     ScanKeyInit(&scankey,
    1579             :                 Anum_pg_database_datname,
    1580             :                 BTEqualStrategyNumber, F_NAMEEQ,
    1581           6 :                 CStringGetDatum(stmt->dbname));
    1582           6 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    1583             :                               NULL, 1, &scankey);
    1584           6 :     tuple = systable_getnext(scan);
    1585           6 :     if (!HeapTupleIsValid(tuple))
    1586           0 :         ereport(ERROR,
    1587             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1588             :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    1589             : 
    1590           6 :     datform = (Form_pg_database) GETSTRUCT(tuple);
    1591           6 :     dboid = datform->oid;
    1592             : 
    1593           6 :     if (!pg_database_ownercheck(dboid, GetUserId()))
    1594           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1595           0 :                        stmt->dbname);
    1596             : 
    1597             :     /*
    1598             :      * In order to avoid getting locked out and having to go through
    1599             :      * standalone mode, we refuse to disallow connections to the database
    1600             :      * we're currently connected to.  Lockout can still happen with concurrent
    1601             :      * sessions but the likeliness of that is not high enough to worry about.
    1602             :      */
    1603           6 :     if (!dballowconnections && dboid == MyDatabaseId)
    1604           0 :         ereport(ERROR,
    1605             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1606             :                  errmsg("cannot disallow connections for current database")));
    1607             : 
    1608             :     /*
    1609             :      * Build an updated tuple, perusing the information just obtained
    1610             :      */
    1611          90 :     MemSet(new_record, 0, sizeof(new_record));
    1612           6 :     MemSet(new_record_nulls, false, sizeof(new_record_nulls));
    1613           6 :     MemSet(new_record_repl, false, sizeof(new_record_repl));
    1614             : 
    1615           6 :     if (distemplate)
    1616             :     {
    1617           2 :         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    1618           2 :         new_record_repl[Anum_pg_database_datistemplate - 1] = true;
    1619             :     }
    1620           6 :     if (dallowconnections)
    1621             :     {
    1622           4 :         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    1623           4 :         new_record_repl[Anum_pg_database_datallowconn - 1] = true;
    1624             :     }
    1625           6 :     if (dconnlimit)
    1626             :     {
    1627           0 :         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    1628           0 :         new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    1629             :     }
    1630             : 
    1631           6 :     newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
    1632             :                                  new_record_nulls, new_record_repl);
    1633           6 :     CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    1634             : 
    1635           6 :     InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
    1636             : 
    1637           6 :     systable_endscan(scan);
    1638             : 
    1639             :     /* Close pg_database, but keep lock till commit */
    1640           6 :     table_close(rel, NoLock);
    1641             : 
    1642           6 :     return dboid;
    1643             : }
    1644             : 
    1645             : 
    1646             : /*
    1647             :  * ALTER DATABASE name SET ...
    1648             :  */
    1649             : Oid
    1650         840 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
    1651             : {
    1652         840 :     Oid         datid = get_database_oid(stmt->dbname, false);
    1653             : 
    1654             :     /*
    1655             :      * Obtain a lock on the database and make sure it didn't go away in the
    1656             :      * meantime.
    1657             :      */
    1658         840 :     shdepLockAndCheckObject(DatabaseRelationId, datid);
    1659             : 
    1660         840 :     if (!pg_database_ownercheck(datid, GetUserId()))
    1661           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1662           0 :                        stmt->dbname);
    1663             : 
    1664         840 :     AlterSetting(datid, InvalidOid, stmt->setstmt);
    1665             : 
    1666         840 :     UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
    1667             : 
    1668         840 :     return datid;
    1669             : }
    1670             : 
    1671             : 
    1672             : /*
    1673             :  * ALTER DATABASE name OWNER TO newowner
    1674             :  */
    1675             : ObjectAddress
    1676          30 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
    1677             : {
    1678             :     Oid         db_id;
    1679             :     HeapTuple   tuple;
    1680             :     Relation    rel;
    1681             :     ScanKeyData scankey;
    1682             :     SysScanDesc scan;
    1683             :     Form_pg_database datForm;
    1684             :     ObjectAddress address;
    1685             : 
    1686             :     /*
    1687             :      * Get the old tuple.  We don't need a lock on the database per se,
    1688             :      * because we're not going to do anything that would mess up incoming
    1689             :      * connections.
    1690             :      */
    1691          30 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1692          30 :     ScanKeyInit(&scankey,
    1693             :                 Anum_pg_database_datname,
    1694             :                 BTEqualStrategyNumber, F_NAMEEQ,
    1695             :                 CStringGetDatum(dbname));
    1696          30 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    1697             :                               NULL, 1, &scankey);
    1698          30 :     tuple = systable_getnext(scan);
    1699          30 :     if (!HeapTupleIsValid(tuple))
    1700           0 :         ereport(ERROR,
    1701             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1702             :                  errmsg("database \"%s\" does not exist", dbname)));
    1703             : 
    1704          30 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    1705          30 :     db_id = datForm->oid;
    1706             : 
    1707             :     /*
    1708             :      * If the new owner is the same as the existing owner, consider the
    1709             :      * command to have succeeded.  This is to be consistent with other
    1710             :      * objects.
    1711             :      */
    1712          30 :     if (datForm->datdba != newOwnerId)
    1713             :     {
    1714             :         Datum       repl_val[Natts_pg_database];
    1715             :         bool        repl_null[Natts_pg_database];
    1716             :         bool        repl_repl[Natts_pg_database];
    1717             :         Acl        *newAcl;
    1718             :         Datum       aclDatum;
    1719             :         bool        isNull;
    1720             :         HeapTuple   newtuple;
    1721             : 
    1722             :         /* Otherwise, must be owner of the existing object */
    1723          18 :         if (!pg_database_ownercheck(db_id, GetUserId()))
    1724           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1725             :                            dbname);
    1726             : 
    1727             :         /* Must be able to become new owner */
    1728          18 :         check_is_member_of_role(GetUserId(), newOwnerId);
    1729             : 
    1730             :         /*
    1731             :          * must have createdb rights
    1732             :          *
    1733             :          * NOTE: This is different from other alter-owner checks in that the
    1734             :          * current user is checked for createdb privileges instead of the
    1735             :          * destination owner.  This is consistent with the CREATE case for
    1736             :          * databases.  Because superusers will always have this right, we need
    1737             :          * no special case for them.
    1738             :          */
    1739          18 :         if (!have_createdb_privilege())
    1740           0 :             ereport(ERROR,
    1741             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1742             :                      errmsg("permission denied to change owner of database")));
    1743             : 
    1744          18 :         memset(repl_null, false, sizeof(repl_null));
    1745          18 :         memset(repl_repl, false, sizeof(repl_repl));
    1746             : 
    1747          18 :         repl_repl[Anum_pg_database_datdba - 1] = true;
    1748          18 :         repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
    1749             : 
    1750             :         /*
    1751             :          * Determine the modified ACL for the new owner.  This is only
    1752             :          * necessary when the ACL is non-null.
    1753             :          */
    1754          18 :         aclDatum = heap_getattr(tuple,
    1755             :                                 Anum_pg_database_datacl,
    1756             :                                 RelationGetDescr(rel),
    1757             :                                 &isNull);
    1758          18 :         if (!isNull)
    1759             :         {
    1760           0 :             newAcl = aclnewowner(DatumGetAclP(aclDatum),
    1761             :                                  datForm->datdba, newOwnerId);
    1762           0 :             repl_repl[Anum_pg_database_datacl - 1] = true;
    1763           0 :             repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
    1764             :         }
    1765             : 
    1766          18 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
    1767          18 :         CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
    1768             : 
    1769          18 :         heap_freetuple(newtuple);
    1770             : 
    1771             :         /* Update owner dependency reference */
    1772          18 :         changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
    1773             :     }
    1774             : 
    1775          30 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    1776             : 
    1777          30 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    1778             : 
    1779          30 :     systable_endscan(scan);
    1780             : 
    1781             :     /* Close pg_database, but keep lock till commit */
    1782          30 :     table_close(rel, NoLock);
    1783             : 
    1784          30 :     return address;
    1785             : }
    1786             : 
    1787             : 
    1788             : /*
    1789             :  * Helper functions
    1790             :  */
    1791             : 
    1792             : /*
    1793             :  * Look up info about the database named "name".  If the database exists,
    1794             :  * obtain the specified lock type on it, fill in any of the remaining
    1795             :  * parameters that aren't NULL, and return true.  If no such database,
    1796             :  * return false.
    1797             :  */
    1798             : static bool
    1799        1000 : get_db_info(const char *name, LOCKMODE lockmode,
    1800             :             Oid *dbIdP, Oid *ownerIdP,
    1801             :             int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
    1802             :             Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
    1803             :             MultiXactId *dbMinMultiP,
    1804             :             Oid *dbTablespace, char **dbCollate, char **dbCtype)
    1805             : {
    1806        1000 :     bool        result = false;
    1807             :     Relation    relation;
    1808             : 
    1809             :     AssertArg(name);
    1810             : 
    1811             :     /* Caller may wish to grab a better lock on pg_database beforehand... */
    1812        1000 :     relation = table_open(DatabaseRelationId, AccessShareLock);
    1813             : 
    1814             :     /*
    1815             :      * Loop covers the rare case where the database is renamed before we can
    1816             :      * lock it.  We try again just in case we can find a new one of the same
    1817             :      * name.
    1818             :      */
    1819             :     for (;;)
    1820           0 :     {
    1821             :         ScanKeyData scanKey;
    1822             :         SysScanDesc scan;
    1823             :         HeapTuple   tuple;
    1824             :         Oid         dbOid;
    1825             : 
    1826             :         /*
    1827             :          * there's no syscache for database-indexed-by-name, so must do it the
    1828             :          * hard way
    1829             :          */
    1830        1000 :         ScanKeyInit(&scanKey,
    1831             :                     Anum_pg_database_datname,
    1832             :                     BTEqualStrategyNumber, F_NAMEEQ,
    1833             :                     CStringGetDatum(name));
    1834             : 
    1835        1000 :         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
    1836             :                                   NULL, 1, &scanKey);
    1837             : 
    1838        1000 :         tuple = systable_getnext(scan);
    1839             : 
    1840        1000 :         if (!HeapTupleIsValid(tuple))
    1841             :         {
    1842             :             /* definitely no database of that name */
    1843          20 :             systable_endscan(scan);
    1844          20 :             break;
    1845             :         }
    1846             : 
    1847         980 :         dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
    1848             : 
    1849         980 :         systable_endscan(scan);
    1850             : 
    1851             :         /*
    1852             :          * Now that we have a database OID, we can try to lock the DB.
    1853             :          */
    1854         980 :         if (lockmode != NoLock)
    1855         980 :             LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    1856             : 
    1857             :         /*
    1858             :          * And now, re-fetch the tuple by OID.  If it's still there and still
    1859             :          * the same name, we win; else, drop the lock and loop back to try
    1860             :          * again.
    1861             :          */
    1862         980 :         tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
    1863         980 :         if (HeapTupleIsValid(tuple))
    1864             :         {
    1865         980 :             Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
    1866             : 
    1867         980 :             if (strcmp(name, NameStr(dbform->datname)) == 0)
    1868             :             {
    1869             :                 /* oid of the database */
    1870         980 :                 if (dbIdP)
    1871         980 :                     *dbIdP = dbOid;
    1872             :                 /* oid of the owner */
    1873         980 :                 if (ownerIdP)
    1874         960 :                     *ownerIdP = dbform->datdba;
    1875             :                 /* character encoding */
    1876         980 :                 if (encodingP)
    1877         960 :                     *encodingP = dbform->encoding;
    1878             :                 /* allowed as template? */
    1879         980 :                 if (dbIsTemplateP)
    1880         980 :                     *dbIsTemplateP = dbform->datistemplate;
    1881             :                 /* allowing connections? */
    1882         980 :                 if (dbAllowConnP)
    1883         960 :                     *dbAllowConnP = dbform->datallowconn;
    1884             :                 /* last system OID used in database */
    1885         980 :                 if (dbLastSysOidP)
    1886         960 :                     *dbLastSysOidP = dbform->datlastsysoid;
    1887             :                 /* limit of frozen XIDs */
    1888         980 :                 if (dbFrozenXidP)
    1889         960 :                     *dbFrozenXidP = dbform->datfrozenxid;
    1890             :                 /* minimum MultiXactId */
    1891         980 :                 if (dbMinMultiP)
    1892         960 :                     *dbMinMultiP = dbform->datminmxid;
    1893             :                 /* default tablespace for this database */
    1894         980 :                 if (dbTablespace)
    1895         960 :                     *dbTablespace = dbform->dattablespace;
    1896             :                 /* default locale settings for this database */
    1897         980 :                 if (dbCollate)
    1898         960 :                     *dbCollate = pstrdup(NameStr(dbform->datcollate));
    1899         980 :                 if (dbCtype)
    1900         960 :                     *dbCtype = pstrdup(NameStr(dbform->datctype));
    1901         980 :                 ReleaseSysCache(tuple);
    1902         980 :                 result = true;
    1903         980 :                 break;
    1904             :             }
    1905             :             /* can only get here if it was just renamed */
    1906           0 :             ReleaseSysCache(tuple);
    1907             :         }
    1908             : 
    1909           0 :         if (lockmode != NoLock)
    1910           0 :             UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    1911             :     }
    1912             : 
    1913        1000 :     table_close(relation, AccessShareLock);
    1914             : 
    1915        1000 :     return result;
    1916             : }
    1917             : 
    1918             : /* Check if current user has createdb privileges */
    1919             : static bool
    1920         978 : have_createdb_privilege(void)
    1921             : {
    1922         978 :     bool        result = false;
    1923             :     HeapTuple   utup;
    1924             : 
    1925             :     /* Superusers can always do everything */
    1926         978 :     if (superuser())
    1927         978 :         return true;
    1928             : 
    1929           0 :     utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
    1930           0 :     if (HeapTupleIsValid(utup))
    1931             :     {
    1932           0 :         result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
    1933           0 :         ReleaseSysCache(utup);
    1934             :     }
    1935           0 :     return result;
    1936             : }
    1937             : 
    1938             : /*
    1939             :  * Remove tablespace directories
    1940             :  *
    1941             :  * We don't know what tablespaces db_id is using, so iterate through all
    1942             :  * tablespaces removing <tablespace>/db_id
    1943             :  */
    1944             : static void
    1945          18 : remove_dbtablespaces(Oid db_id)
    1946             : {
    1947             :     Relation    rel;
    1948             :     TableScanDesc scan;
    1949             :     HeapTuple   tuple;
    1950          18 :     List       *ltblspc = NIL;
    1951             :     ListCell   *cell;
    1952             :     int         ntblspc;
    1953             :     int         i;
    1954             :     Oid        *tablespace_ids;
    1955             : 
    1956          18 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    1957          18 :     scan = table_beginscan_catalog(rel, 0, NULL);
    1958          54 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    1959             :     {
    1960          36 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    1961          36 :         Oid         dsttablespace = spcform->oid;
    1962             :         char       *dstpath;
    1963             :         struct stat st;
    1964             : 
    1965             :         /* Don't mess with the global tablespace */
    1966          36 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    1967          18 :             continue;
    1968             : 
    1969          18 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    1970             : 
    1971          18 :         if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
    1972             :         {
    1973             :             /* Assume we can ignore it */
    1974           0 :             pfree(dstpath);
    1975           0 :             continue;
    1976             :         }
    1977             : 
    1978          18 :         if (!rmtree(dstpath, true))
    1979           0 :             ereport(WARNING,
    1980             :                     (errmsg("some useless files may be left behind in old database directory \"%s\"",
    1981             :                             dstpath)));
    1982             : 
    1983          18 :         ltblspc = lappend_oid(ltblspc, dsttablespace);
    1984          18 :         pfree(dstpath);
    1985             :     }
    1986             : 
    1987          18 :     ntblspc = list_length(ltblspc);
    1988          18 :     if (ntblspc == 0)
    1989             :     {
    1990           0 :         table_endscan(scan);
    1991           0 :         table_close(rel, AccessShareLock);
    1992           0 :         return;
    1993             :     }
    1994             : 
    1995          18 :     tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
    1996          18 :     i = 0;
    1997          36 :     foreach(cell, ltblspc)
    1998          18 :         tablespace_ids[i++] = lfirst_oid(cell);
    1999             : 
    2000             :     /* Record the filesystem change in XLOG */
    2001             :     {
    2002             :         xl_dbase_drop_rec xlrec;
    2003             : 
    2004          18 :         xlrec.db_id = db_id;
    2005          18 :         xlrec.ntablespaces = ntblspc;
    2006             : 
    2007          18 :         XLogBeginInsert();
    2008          18 :         XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
    2009          18 :         XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
    2010             : 
    2011          18 :         (void) XLogInsert(RM_DBASE_ID,
    2012             :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2013             :     }
    2014             : 
    2015          18 :     list_free(ltblspc);
    2016          18 :     pfree(tablespace_ids);
    2017             : 
    2018          18 :     table_endscan(scan);
    2019          18 :     table_close(rel, AccessShareLock);
    2020             : }
    2021             : 
    2022             : /*
    2023             :  * Check for existing files that conflict with a proposed new DB OID;
    2024             :  * return true if there are any
    2025             :  *
    2026             :  * If there were a subdirectory in any tablespace matching the proposed new
    2027             :  * OID, we'd get a create failure due to the duplicate name ... and then we'd
    2028             :  * try to remove that already-existing subdirectory during the cleanup in
    2029             :  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
    2030             :  * instead we make this extra check before settling on the OID of the new
    2031             :  * database.  This exactly parallels what GetNewRelFileNode() does for table
    2032             :  * relfilenode values.
    2033             :  */
    2034             : static bool
    2035         958 : check_db_file_conflict(Oid db_id)
    2036             : {
    2037         958 :     bool        result = false;
    2038             :     Relation    rel;
    2039             :     TableScanDesc scan;
    2040             :     HeapTuple   tuple;
    2041             : 
    2042         958 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    2043         958 :     scan = table_beginscan_catalog(rel, 0, NULL);
    2044        2874 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2045             :     {
    2046        1916 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    2047        1916 :         Oid         dsttablespace = spcform->oid;
    2048             :         char       *dstpath;
    2049             :         struct stat st;
    2050             : 
    2051             :         /* Don't mess with the global tablespace */
    2052        1916 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    2053         958 :             continue;
    2054             : 
    2055         958 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    2056             : 
    2057         958 :         if (lstat(dstpath, &st) == 0)
    2058             :         {
    2059             :             /* Found a conflicting file (or directory, whatever) */
    2060           0 :             pfree(dstpath);
    2061           0 :             result = true;
    2062           0 :             break;
    2063             :         }
    2064             : 
    2065         958 :         pfree(dstpath);
    2066             :     }
    2067             : 
    2068         958 :     table_endscan(scan);
    2069         958 :     table_close(rel, AccessShareLock);
    2070             : 
    2071         958 :     return result;
    2072             : }
    2073             : 
    2074             : /*
    2075             :  * Issue a suitable errdetail message for a busy database
    2076             :  */
    2077             : static int
    2078           0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
    2079             : {
    2080           0 :     if (notherbackends > 0 && npreparedxacts > 0)
    2081             : 
    2082             :         /*
    2083             :          * We don't deal with singular versus plural here, since gettext
    2084             :          * doesn't support multiple plurals in one string.
    2085             :          */
    2086           0 :         errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
    2087             :                   notherbackends, npreparedxacts);
    2088           0 :     else if (notherbackends > 0)
    2089           0 :         errdetail_plural("There is %d other session using the database.",
    2090             :                          "There are %d other sessions using the database.",
    2091             :                          notherbackends,
    2092             :                          notherbackends);
    2093             :     else
    2094           0 :         errdetail_plural("There is %d prepared transaction using the database.",
    2095             :                          "There are %d prepared transactions using the database.",
    2096             :                          npreparedxacts,
    2097             :                          npreparedxacts);
    2098           0 :     return 0;                   /* just to keep ereport macro happy */
    2099             : }
    2100             : 
    2101             : /*
    2102             :  * get_database_oid - given a database name, look up the OID
    2103             :  *
    2104             :  * If missing_ok is false, throw an error if database name not found.  If
    2105             :  * true, just return InvalidOid.
    2106             :  */
    2107             : Oid
    2108        3998 : get_database_oid(const char *dbname, bool missing_ok)
    2109             : {
    2110             :     Relation    pg_database;
    2111             :     ScanKeyData entry[1];
    2112             :     SysScanDesc scan;
    2113             :     HeapTuple   dbtuple;
    2114             :     Oid         oid;
    2115             : 
    2116             :     /*
    2117             :      * There's no syscache for pg_database indexed by name, so we must look
    2118             :      * the hard way.
    2119             :      */
    2120        3998 :     pg_database = table_open(DatabaseRelationId, AccessShareLock);
    2121        3998 :     ScanKeyInit(&entry[0],
    2122             :                 Anum_pg_database_datname,
    2123             :                 BTEqualStrategyNumber, F_NAMEEQ,
    2124             :                 CStringGetDatum(dbname));
    2125        3998 :     scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
    2126             :                               NULL, 1, entry);
    2127             : 
    2128        3998 :     dbtuple = systable_getnext(scan);
    2129             : 
    2130             :     /* We assume that there can be at most one matching tuple */
    2131        3998 :     if (HeapTupleIsValid(dbtuple))
    2132        3036 :         oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
    2133             :     else
    2134         962 :         oid = InvalidOid;
    2135             : 
    2136        3998 :     systable_endscan(scan);
    2137        3998 :     table_close(pg_database, AccessShareLock);
    2138             : 
    2139        3998 :     if (!OidIsValid(oid) && !missing_ok)
    2140           4 :         ereport(ERROR,
    2141             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2142             :                  errmsg("database \"%s\" does not exist",
    2143             :                         dbname)));
    2144             : 
    2145        3994 :     return oid;
    2146             : }
    2147             : 
    2148             : 
    2149             : /*
    2150             :  * get_database_name - given a database OID, look up the name
    2151             :  *
    2152             :  * Returns a palloc'd string, or NULL if no such database.
    2153             :  */
    2154             : char *
    2155        4698 : get_database_name(Oid dbid)
    2156             : {
    2157             :     HeapTuple   dbtuple;
    2158             :     char       *result;
    2159             : 
    2160        4698 :     dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    2161        4698 :     if (HeapTupleIsValid(dbtuple))
    2162             :     {
    2163        4698 :         result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
    2164        4698 :         ReleaseSysCache(dbtuple);
    2165             :     }
    2166             :     else
    2167           0 :         result = NULL;
    2168             : 
    2169        4698 :     return result;
    2170             : }
    2171             : 
    2172             : /*
    2173             :  * DATABASE resource manager's routines
    2174             :  */
    2175             : void
    2176           6 : dbase_redo(XLogReaderState *record)
    2177             : {
    2178           6 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    2179             : 
    2180             :     /* Backup blocks are not used in dbase records */
    2181             :     Assert(!XLogRecHasAnyBlockRefs(record));
    2182             : 
    2183           6 :     if (info == XLOG_DBASE_CREATE)
    2184             :     {
    2185           4 :         xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
    2186             :         char       *src_path;
    2187             :         char       *dst_path;
    2188             :         struct stat st;
    2189             : 
    2190           4 :         src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
    2191           4 :         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    2192             : 
    2193             :         /*
    2194             :          * Our theory for replaying a CREATE is to forcibly drop the target
    2195             :          * subdirectory if present, then re-copy the source data. This may be
    2196             :          * more work than needed, but it is simple to implement.
    2197             :          */
    2198           4 :         if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
    2199             :         {
    2200           0 :             if (!rmtree(dst_path, true))
    2201             :                 /* If this failed, copydir() below is going to error. */
    2202           0 :                 ereport(WARNING,
    2203             :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2204             :                                 dst_path)));
    2205             :         }
    2206             : 
    2207             :         /*
    2208             :          * Force dirty buffers out to disk, to ensure source database is
    2209             :          * up-to-date for the copy.
    2210             :          */
    2211           4 :         FlushDatabaseBuffers(xlrec->src_db_id);
    2212             : 
    2213             :         /*
    2214             :          * Copy this subdirectory to the new location
    2215             :          *
    2216             :          * We don't need to copy subdirectories
    2217             :          */
    2218           4 :         copydir(src_path, dst_path, false);
    2219             :     }
    2220           2 :     else if (info == XLOG_DBASE_DROP)
    2221             :     {
    2222           2 :         xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
    2223             :         char       *dst_path;
    2224             :         int         i;
    2225             : 
    2226           2 :         if (InHotStandby)
    2227             :         {
    2228             :             /*
    2229             :              * Lock database while we resolve conflicts to ensure that
    2230             :              * InitPostgres() cannot fully re-execute concurrently. This
    2231             :              * avoids backends re-connecting automatically to same database,
    2232             :              * which can happen in some cases.
    2233             :              *
    2234             :              * This will lock out walsenders trying to connect to db-specific
    2235             :              * slots for logical decoding too, so it's safe for us to drop
    2236             :              * slots.
    2237             :              */
    2238           2 :             LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    2239           2 :             ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    2240             :         }
    2241             : 
    2242             :         /* Drop any database-specific replication slots */
    2243           2 :         ReplicationSlotsDropDBSlots(xlrec->db_id);
    2244             : 
    2245             :         /* Drop pages for this database that are in the shared buffer cache */
    2246           2 :         DropDatabaseBuffers(xlrec->db_id);
    2247             : 
    2248             :         /* Also, clean out any fsync requests that might be pending in md.c */
    2249           2 :         ForgetDatabaseSyncRequests(xlrec->db_id);
    2250             : 
    2251             :         /* Clean out the xlog relcache too */
    2252           2 :         XLogDropDatabase(xlrec->db_id);
    2253             : 
    2254           4 :         for (i = 0; i < xlrec->ntablespaces; i++)
    2255             :         {
    2256           2 :             dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
    2257             : 
    2258             :             /* And remove the physical files */
    2259           2 :             if (!rmtree(dst_path, true))
    2260           0 :                 ereport(WARNING,
    2261             :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2262             :                                 dst_path)));
    2263           2 :             pfree(dst_path);
    2264             :         }
    2265             : 
    2266           2 :         if (InHotStandby)
    2267             :         {
    2268             :             /*
    2269             :              * Release locks prior to commit. XXX There is a race condition
    2270             :              * here that may allow backends to reconnect, but the window for
    2271             :              * this is small because the gap between here and commit is mostly
    2272             :              * fairly small and it is unlikely that people will be dropping
    2273             :              * databases that we are trying to connect to anyway.
    2274             :              */
    2275           2 :             UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    2276             :         }
    2277             :     }
    2278             :     else
    2279           0 :         elog(PANIC, "dbase_redo: unknown op code %u", info);
    2280           6 : }

Generated by: LCOV version 1.13