Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/pg_locale.h"
68 : #include "utils/relmapper.h"
69 : #include "utils/snapmgr.h"
70 : #include "utils/syscache.h"
71 :
72 : /*
73 : * Create database strategy.
74 : *
75 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76 : * copied block.
77 : *
78 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79 : * database and log a single record for each tablespace copied. To make this
80 : * safe, it also triggers checkpoints before and after the operation.
81 : */
82 : typedef enum CreateDBStrategy
83 : {
84 : CREATEDB_WAL_LOG,
85 : CREATEDB_FILE_COPY,
86 : } CreateDBStrategy;
87 :
88 : typedef struct
89 : {
90 : Oid src_dboid; /* source (template) DB */
91 : Oid dest_dboid; /* DB we are trying to create */
92 : CreateDBStrategy strategy; /* create db strategy */
93 : } createdb_failure_params;
94 :
95 : typedef struct
96 : {
97 : Oid dest_dboid; /* DB we are trying to move */
98 : Oid dest_tsoid; /* tablespace we are trying to move to */
99 : } movedb_failure_params;
100 :
101 : /*
102 : * Information about a relation to be copied when creating a database.
103 : */
104 : typedef struct CreateDBRelInfo
105 : {
106 : RelFileLocator rlocator; /* physical relation identifier */
107 : Oid reloid; /* relation oid */
108 : bool permanent; /* relation is permanent or unlogged */
109 : } CreateDBRelInfo;
110 :
111 :
112 : /* non-export function prototypes */
113 : static void createdb_failure_callback(int code, Datum arg);
114 : static void movedb(const char *dbname, const char *tblspcname);
115 : static void movedb_failure_callback(int code, Datum arg);
116 : static bool get_db_info(const char *name, LOCKMODE lockmode,
117 : Oid *dbIdP, Oid *ownerIdP,
118 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121 : char **dbIcurules,
122 : char *dbLocProvider,
123 : char **dbCollversion);
124 : static void remove_dbtablespaces(Oid db_id);
125 : static bool check_db_file_conflict(Oid db_id);
126 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
127 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128 : Oid dst_tsid);
129 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
130 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
131 : Oid dbid, char *srcpath,
132 : List *rlocatorlist, Snapshot snapshot);
133 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
134 : Oid tbid, Oid dbid,
135 : char *srcpath);
136 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137 : bool isRedo);
138 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139 : Oid src_tsid, Oid dst_tsid);
140 : static void recovery_create_dbdir(char *path, bool only_tblspc);
141 :
142 : /*
143 : * Create a new database using the WAL_LOG strategy.
144 : *
145 : * Each copied block is separately written to the write-ahead log.
146 : */
147 : static void
148 434 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149 : Oid src_tsid, Oid dst_tsid)
150 : {
151 : char *srcpath;
152 : char *dstpath;
153 434 : List *rlocatorlist = NULL;
154 : ListCell *cell;
155 : LockRelId srcrelid;
156 : LockRelId dstrelid;
157 : RelFileLocator srcrlocator;
158 : RelFileLocator dstrlocator;
159 : CreateDBRelInfo *relinfo;
160 :
161 : /* Get source and destination database paths. */
162 434 : srcpath = GetDatabasePath(src_dboid, src_tsid);
163 434 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164 :
165 : /* Create database directory and write PG_VERSION file. */
166 434 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167 :
168 : /* Copy relmap file from source database to the destination database. */
169 434 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170 :
171 : /* Get list of relfilelocators to copy from the source database. */
172 434 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173 : Assert(rlocatorlist != NIL);
174 :
175 : /*
176 : * Database IDs will be the same for all relations so set them before
177 : * entering the loop.
178 : */
179 434 : srcrelid.dbId = src_dboid;
180 434 : dstrelid.dbId = dst_dboid;
181 :
182 : /* Loop over our list of relfilelocators and copy each one. */
183 96814 : foreach(cell, rlocatorlist)
184 : {
185 96380 : relinfo = lfirst(cell);
186 96380 : srcrlocator = relinfo->rlocator;
187 :
188 : /*
189 : * If the relation is from the source db's default tablespace then we
190 : * need to create it in the destination db's default tablespace.
191 : * Otherwise, we need to create in the same tablespace as it is in the
192 : * source database.
193 : */
194 96380 : if (srcrlocator.spcOid == src_tsid)
195 96380 : dstrlocator.spcOid = dst_tsid;
196 : else
197 0 : dstrlocator.spcOid = srcrlocator.spcOid;
198 :
199 96380 : dstrlocator.dbOid = dst_dboid;
200 96380 : dstrlocator.relNumber = srcrlocator.relNumber;
201 :
202 : /*
203 : * Acquire locks on source and target relations before copying.
204 : *
205 : * We typically do not read relation data into shared_buffers without
206 : * holding a relation lock. It's unclear what could go wrong if we
207 : * skipped it in this case, because nobody can be modifying either the
208 : * source or destination database at this point, and we have locks on
209 : * both databases, too, but let's take the conservative route.
210 : */
211 96380 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
212 96380 : LockRelationId(&srcrelid, AccessShareLock);
213 96380 : LockRelationId(&dstrelid, AccessShareLock);
214 :
215 : /* Copy relation storage from source to the destination. */
216 96380 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217 :
218 : /* Release the relation locks. */
219 96380 : UnlockRelationId(&srcrelid, AccessShareLock);
220 96380 : UnlockRelationId(&dstrelid, AccessShareLock);
221 : }
222 :
223 434 : pfree(srcpath);
224 434 : pfree(dstpath);
225 434 : list_free_deep(rlocatorlist);
226 434 : }
227 :
228 : /*
229 : * Scan the pg_class table in the source database to identify the relations
230 : * that need to be copied to the destination database.
231 : *
232 : * This is an exception to the usual rule that cross-database access is
233 : * not possible. We can make it work here because we know that there are no
234 : * connections to the source database and (since there can't be prepared
235 : * transactions touching that database) no in-doubt tuples either. This
236 : * means that we don't need to worry about pruning removing anything from
237 : * under us, and we don't need to be too picky about our snapshot either.
238 : * As long as it sees all previously-committed XIDs as committed and all
239 : * aborted XIDs as aborted, we should be fine: nothing else is possible
240 : * here.
241 : *
242 : * We can't rely on the relcache for anything here, because that only knows
243 : * about the database to which we are connected, and can't handle access to
244 : * other databases. That also means we can't rely on the heap scan
245 : * infrastructure, which would be a bad idea anyway since it might try
246 : * to do things like HOT pruning which we definitely can't do safely in
247 : * a database to which we're not even connected.
248 : */
249 : static List *
250 434 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251 : {
252 : RelFileLocator rlocator;
253 : BlockNumber nblocks;
254 : BlockNumber blkno;
255 : Buffer buf;
256 : RelFileNumber relfilenumber;
257 : Page page;
258 434 : List *rlocatorlist = NIL;
259 : LockRelId relid;
260 : Snapshot snapshot;
261 : SMgrRelation smgr;
262 : BufferAccessStrategy bstrategy;
263 :
264 : /* Get pg_class relfilenumber. */
265 434 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266 : RelationRelationId);
267 :
268 : /* Don't read data into shared_buffers without holding a relation lock. */
269 434 : relid.dbId = dbid;
270 434 : relid.relId = RelationRelationId;
271 434 : LockRelationId(&relid, AccessShareLock);
272 :
273 : /* Prepare a RelFileLocator for the pg_class relation. */
274 434 : rlocator.spcOid = tbid;
275 434 : rlocator.dbOid = dbid;
276 434 : rlocator.relNumber = relfilenumber;
277 :
278 434 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279 434 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280 434 : smgrclose(smgr);
281 :
282 : /* Use a buffer access strategy since this is a bulk read operation. */
283 434 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
284 :
285 : /*
286 : * As explained in the function header comments, we need a snapshot that
287 : * will see all committed transactions as committed, and our transaction
288 : * snapshot - or the active snapshot - might not be new enough for that,
289 : * but the return value of GetLatestSnapshot() should work fine.
290 : */
291 434 : snapshot = GetLatestSnapshot();
292 :
293 : /* Process the relation block by block. */
294 6550 : for (blkno = 0; blkno < nblocks; blkno++)
295 : {
296 6116 : CHECK_FOR_INTERRUPTS();
297 :
298 6116 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299 : RBM_NORMAL, bstrategy, true);
300 :
301 6116 : LockBuffer(buf, BUFFER_LOCK_SHARE);
302 6116 : page = BufferGetPage(buf);
303 6116 : if (PageIsNew(page) || PageIsEmpty(page))
304 : {
305 0 : UnlockReleaseBuffer(buf);
306 0 : continue;
307 : }
308 :
309 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
310 6116 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311 : srcpath, rlocatorlist,
312 : snapshot);
313 :
314 6116 : UnlockReleaseBuffer(buf);
315 : }
316 :
317 : /* Release relation lock. */
318 434 : UnlockRelationId(&relid, AccessShareLock);
319 :
320 434 : return rlocatorlist;
321 : }
322 :
323 : /*
324 : * Scan one page of the source database's pg_class relation and add relevant
325 : * entries to rlocatorlist. The return value is the updated list.
326 : */
327 : static List *
328 6116 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
329 : char *srcpath, List *rlocatorlist,
330 : Snapshot snapshot)
331 : {
332 6116 : BlockNumber blkno = BufferGetBlockNumber(buf);
333 : OffsetNumber offnum;
334 : OffsetNumber maxoff;
335 : HeapTupleData tuple;
336 :
337 6116 : maxoff = PageGetMaxOffsetNumber(page);
338 :
339 : /* Loop over offsets. */
340 313318 : for (offnum = FirstOffsetNumber;
341 : offnum <= maxoff;
342 307202 : offnum = OffsetNumberNext(offnum))
343 : {
344 : ItemId itemid;
345 :
346 307202 : itemid = PageGetItemId(page, offnum);
347 :
348 : /* Nothing to do if slot is empty or already dead. */
349 307202 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
350 219642 : ItemIdIsRedirected(itemid))
351 124870 : continue;
352 :
353 : Assert(ItemIdIsNormal(itemid));
354 182332 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
355 :
356 : /* Initialize a HeapTupleData structure. */
357 182332 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
358 182332 : tuple.t_len = ItemIdGetLength(itemid);
359 182332 : tuple.t_tableOid = RelationRelationId;
360 :
361 : /* Skip tuples that are not visible to this snapshot. */
362 182332 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
363 : {
364 : CreateDBRelInfo *relinfo;
365 :
366 : /*
367 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
368 : * CreateDBRelInfo object for this tuple, but can also decide that
369 : * this tuple isn't something we need to copy. If we do need to
370 : * copy the relation, add it to the list.
371 : */
372 180142 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
373 : srcpath);
374 180142 : if (relinfo != NULL)
375 96380 : rlocatorlist = lappend(rlocatorlist, relinfo);
376 : }
377 : }
378 :
379 6116 : return rlocatorlist;
380 : }
381 :
382 : /*
383 : * Decide whether a certain pg_class tuple represents something that
384 : * needs to be copied from the source database to the destination database,
385 : * and if so, construct a CreateDBRelInfo for it.
386 : *
387 : * Visibility checks are handled by the caller, so our job here is just
388 : * to assess the data stored in the tuple.
389 : */
390 : CreateDBRelInfo *
391 180142 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
392 : char *srcpath)
393 : {
394 : CreateDBRelInfo *relinfo;
395 : Form_pg_class classForm;
396 180142 : RelFileNumber relfilenumber = InvalidRelFileNumber;
397 :
398 180142 : classForm = (Form_pg_class) GETSTRUCT(tuple);
399 :
400 : /*
401 : * Return NULL if this object does not need to be copied.
402 : *
403 : * Shared objects don't need to be copied, because they are shared.
404 : * Objects without storage can't be copied, because there's nothing to
405 : * copy. Temporary relations don't need to be copied either, because they
406 : * are inaccessible outside of the session that created them, which must
407 : * be gone already, and couldn't connect to a different database if it
408 : * still existed. autovacuum will eventually remove the pg_class entries
409 : * as well.
410 : */
411 180142 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
412 158442 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
413 96380 : classForm->relpersistence == RELPERSISTENCE_TEMP)
414 83762 : return NULL;
415 :
416 : /*
417 : * If relfilenumber is valid then directly use it. Otherwise, consult the
418 : * relmap.
419 : */
420 96380 : if (RelFileNumberIsValid(classForm->relfilenode))
421 89002 : relfilenumber = classForm->relfilenode;
422 : else
423 7378 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
424 : classForm->oid);
425 :
426 : /* We must have a valid relfilenumber. */
427 96380 : if (!RelFileNumberIsValid(relfilenumber))
428 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
429 : classForm->oid);
430 :
431 : /* Prepare a rel info element and add it to the list. */
432 96380 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
433 96380 : if (OidIsValid(classForm->reltablespace))
434 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
435 : else
436 96380 : relinfo->rlocator.spcOid = tbid;
437 :
438 96380 : relinfo->rlocator.dbOid = dbid;
439 96380 : relinfo->rlocator.relNumber = relfilenumber;
440 96380 : relinfo->reloid = classForm->oid;
441 :
442 : /* Temporary relations were rejected above. */
443 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
444 96380 : relinfo->permanent =
445 96380 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
446 :
447 96380 : return relinfo;
448 : }
449 :
450 : /*
451 : * Create database directory and write out the PG_VERSION file in the database
452 : * path. If isRedo is true, it's okay for the database directory to exist
453 : * already.
454 : */
455 : static void
456 474 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
457 : {
458 : int fd;
459 : int nbytes;
460 : char versionfile[MAXPGPATH];
461 : char buf[16];
462 :
463 : /*
464 : * Note that we don't have to copy version data from the source database;
465 : * there's only one legal value.
466 : */
467 474 : sprintf(buf, "%s\n", PG_MAJORVERSION);
468 474 : nbytes = strlen(PG_MAJORVERSION) + 1;
469 :
470 : /* Create database directory. */
471 474 : if (MakePGDirectory(dbpath) < 0)
472 : {
473 : /* Failure other than already exists or not in WAL replay? */
474 16 : if (errno != EEXIST || !isRedo)
475 0 : ereport(ERROR,
476 : (errcode_for_file_access(),
477 : errmsg("could not create directory \"%s\": %m", dbpath)));
478 : }
479 :
480 : /*
481 : * Create PG_VERSION file in the database path. If the file already
482 : * exists and we are in WAL replay then try again to open it in write
483 : * mode.
484 : */
485 474 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
486 :
487 474 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
488 474 : if (fd < 0 && errno == EEXIST && isRedo)
489 16 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
490 :
491 474 : if (fd < 0)
492 0 : ereport(ERROR,
493 : (errcode_for_file_access(),
494 : errmsg("could not create file \"%s\": %m", versionfile)));
495 :
496 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
497 474 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
498 474 : errno = 0;
499 474 : if ((int) write(fd, buf, nbytes) != nbytes)
500 : {
501 : /* If write didn't set errno, assume problem is no disk space. */
502 0 : if (errno == 0)
503 0 : errno = ENOSPC;
504 0 : ereport(ERROR,
505 : (errcode_for_file_access(),
506 : errmsg("could not write to file \"%s\": %m", versionfile)));
507 : }
508 474 : pgstat_report_wait_end();
509 :
510 474 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
511 474 : if (pg_fsync(fd) != 0)
512 0 : ereport(data_sync_elevel(ERROR),
513 : (errcode_for_file_access(),
514 : errmsg("could not fsync file \"%s\": %m", versionfile)));
515 474 : fsync_fname(dbpath, true);
516 474 : pgstat_report_wait_end();
517 :
518 : /* Close the version file. */
519 474 : CloseTransientFile(fd);
520 :
521 : /* If we are not in WAL replay then write the WAL. */
522 474 : if (!isRedo)
523 : {
524 : xl_dbase_create_wal_log_rec xlrec;
525 :
526 434 : START_CRIT_SECTION();
527 :
528 434 : xlrec.db_id = dbid;
529 434 : xlrec.tablespace_id = tsid;
530 :
531 434 : XLogBeginInsert();
532 434 : XLogRegisterData((char *) (&xlrec),
533 : sizeof(xl_dbase_create_wal_log_rec));
534 :
535 434 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
536 :
537 434 : END_CRIT_SECTION();
538 : }
539 474 : }
540 :
541 : /*
542 : * Create a new database using the FILE_COPY strategy.
543 : *
544 : * Copy each tablespace at the filesystem level, and log a single WAL record
545 : * for each tablespace copied. This requires a checkpoint before and after the
546 : * copy, which may be expensive, but it does greatly reduce WAL generation
547 : * if the copied database is large.
548 : */
549 : static void
550 172 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
551 : Oid dst_tsid)
552 : {
553 : TableScanDesc scan;
554 : Relation rel;
555 : HeapTuple tuple;
556 :
557 : /*
558 : * Force a checkpoint before starting the copy. This will force all dirty
559 : * buffers, including those of unlogged tables, out to disk, to ensure
560 : * source database is up-to-date on disk for the copy.
561 : * FlushDatabaseBuffers() would suffice for that, but we also want to
562 : * process any pending unlink requests. Otherwise, if a checkpoint
563 : * happened while we're copying files, a file might be deleted just when
564 : * we're about to copy it, causing the lstat() call in copydir() to fail
565 : * with ENOENT.
566 : */
567 172 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
568 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
569 :
570 : /*
571 : * Iterate through all tablespaces of the template database, and copy each
572 : * one to the new database.
573 : */
574 172 : rel = table_open(TableSpaceRelationId, AccessShareLock);
575 172 : scan = table_beginscan_catalog(rel, 0, NULL);
576 552 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
577 : {
578 380 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
579 380 : Oid srctablespace = spaceform->oid;
580 : Oid dsttablespace;
581 : char *srcpath;
582 : char *dstpath;
583 : struct stat st;
584 :
585 : /* No need to copy global tablespace */
586 380 : if (srctablespace == GLOBALTABLESPACE_OID)
587 208 : continue;
588 :
589 208 : srcpath = GetDatabasePath(src_dboid, srctablespace);
590 :
591 380 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
592 172 : directory_is_empty(srcpath))
593 : {
594 : /* Assume we can ignore it */
595 36 : pfree(srcpath);
596 36 : continue;
597 : }
598 :
599 172 : if (srctablespace == src_tsid)
600 172 : dsttablespace = dst_tsid;
601 : else
602 0 : dsttablespace = srctablespace;
603 :
604 172 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
605 :
606 : /*
607 : * Copy this subdirectory to the new location
608 : *
609 : * We don't need to copy subdirectories
610 : */
611 172 : copydir(srcpath, dstpath, false);
612 :
613 : /* Record the filesystem change in XLOG */
614 : {
615 : xl_dbase_create_file_copy_rec xlrec;
616 :
617 172 : xlrec.db_id = dst_dboid;
618 172 : xlrec.tablespace_id = dsttablespace;
619 172 : xlrec.src_db_id = src_dboid;
620 172 : xlrec.src_tablespace_id = srctablespace;
621 :
622 172 : XLogBeginInsert();
623 172 : XLogRegisterData((char *) &xlrec,
624 : sizeof(xl_dbase_create_file_copy_rec));
625 :
626 172 : (void) XLogInsert(RM_DBASE_ID,
627 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
628 : }
629 172 : pfree(srcpath);
630 172 : pfree(dstpath);
631 : }
632 172 : table_endscan(scan);
633 172 : table_close(rel, AccessShareLock);
634 :
635 : /*
636 : * We force a checkpoint before committing. This effectively means that
637 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
638 : * replayed (at least not in ordinary crash recovery; we still have to
639 : * make the XLOG entry for the benefit of PITR operations). This avoids
640 : * two nasty scenarios:
641 : *
642 : * #1: When PITR is off, we don't XLOG the contents of newly created
643 : * indexes; therefore the drop-and-recreate-whole-directory behavior of
644 : * DBASE_CREATE replay would lose such indexes.
645 : *
646 : * #2: Since we have to recopy the source database during DBASE_CREATE
647 : * replay, we run the risk of copying changes in it that were committed
648 : * after the original CREATE DATABASE command but before the system crash
649 : * that led to the replay. This is at least unexpected and at worst could
650 : * lead to inconsistencies, eg duplicate table names.
651 : *
652 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
653 : *
654 : * In PITR replay, the first of these isn't an issue, and the second is
655 : * only a risk if the CREATE DATABASE and subsequent template database
656 : * change both occur while a base backup is being taken. There doesn't
657 : * seem to be much we can do about that except document it as a
658 : * limitation.
659 : *
660 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
661 : * strategy that avoids these problems.
662 : */
663 172 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
664 172 : }
665 :
666 : /*
667 : * CREATE DATABASE
668 : */
669 : Oid
670 640 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
671 : {
672 : Oid src_dboid;
673 : Oid src_owner;
674 640 : int src_encoding = -1;
675 640 : char *src_collate = NULL;
676 640 : char *src_ctype = NULL;
677 640 : char *src_locale = NULL;
678 640 : char *src_icurules = NULL;
679 640 : char src_locprovider = '\0';
680 640 : char *src_collversion = NULL;
681 : bool src_istemplate;
682 640 : bool src_hasloginevt = false;
683 : bool src_allowconn;
684 640 : TransactionId src_frozenxid = InvalidTransactionId;
685 640 : MultiXactId src_minmxid = InvalidMultiXactId;
686 : Oid src_deftablespace;
687 : volatile Oid dst_deftablespace;
688 : Relation pg_database_rel;
689 : HeapTuple tuple;
690 640 : Datum new_record[Natts_pg_database] = {0};
691 640 : bool new_record_nulls[Natts_pg_database] = {0};
692 640 : Oid dboid = InvalidOid;
693 : Oid datdba;
694 : ListCell *option;
695 640 : DefElem *dtablespacename = NULL;
696 640 : DefElem *downer = NULL;
697 640 : DefElem *dtemplate = NULL;
698 640 : DefElem *dencoding = NULL;
699 640 : DefElem *dlocale = NULL;
700 640 : DefElem *dbuiltinlocale = NULL;
701 640 : DefElem *dcollate = NULL;
702 640 : DefElem *dctype = NULL;
703 640 : DefElem *diculocale = NULL;
704 640 : DefElem *dicurules = NULL;
705 640 : DefElem *dlocprovider = NULL;
706 640 : DefElem *distemplate = NULL;
707 640 : DefElem *dallowconnections = NULL;
708 640 : DefElem *dconnlimit = NULL;
709 640 : DefElem *dcollversion = NULL;
710 640 : DefElem *dstrategy = NULL;
711 640 : char *dbname = stmt->dbname;
712 640 : char *dbowner = NULL;
713 640 : const char *dbtemplate = NULL;
714 640 : char *dbcollate = NULL;
715 640 : char *dbctype = NULL;
716 640 : const char *dblocale = NULL;
717 640 : char *dbicurules = NULL;
718 640 : char dblocprovider = '\0';
719 : char *canonname;
720 640 : int encoding = -1;
721 640 : bool dbistemplate = false;
722 640 : bool dballowconnections = true;
723 640 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
724 640 : char *dbcollversion = NULL;
725 : int notherbackends;
726 : int npreparedxacts;
727 640 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
728 : createdb_failure_params fparms;
729 :
730 : /* Extract options from the statement node tree */
731 1744 : foreach(option, stmt->options)
732 : {
733 1104 : DefElem *defel = (DefElem *) lfirst(option);
734 :
735 1104 : if (strcmp(defel->defname, "tablespace") == 0)
736 : {
737 16 : if (dtablespacename)
738 0 : errorConflictingDefElem(defel, pstate);
739 16 : dtablespacename = defel;
740 : }
741 1088 : else if (strcmp(defel->defname, "owner") == 0)
742 : {
743 2 : if (downer)
744 0 : errorConflictingDefElem(defel, pstate);
745 2 : downer = defel;
746 : }
747 1086 : else if (strcmp(defel->defname, "template") == 0)
748 : {
749 280 : if (dtemplate)
750 0 : errorConflictingDefElem(defel, pstate);
751 280 : dtemplate = defel;
752 : }
753 806 : else if (strcmp(defel->defname, "encoding") == 0)
754 : {
755 60 : if (dencoding)
756 0 : errorConflictingDefElem(defel, pstate);
757 60 : dencoding = defel;
758 : }
759 746 : else if (strcmp(defel->defname, "locale") == 0)
760 : {
761 70 : if (dlocale)
762 0 : errorConflictingDefElem(defel, pstate);
763 70 : dlocale = defel;
764 : }
765 676 : else if (strcmp(defel->defname, "builtin_locale") == 0)
766 : {
767 16 : if (dbuiltinlocale)
768 0 : errorConflictingDefElem(defel, pstate);
769 16 : dbuiltinlocale = defel;
770 : }
771 660 : else if (strcmp(defel->defname, "lc_collate") == 0)
772 : {
773 12 : if (dcollate)
774 0 : errorConflictingDefElem(defel, pstate);
775 12 : dcollate = defel;
776 : }
777 648 : else if (strcmp(defel->defname, "lc_ctype") == 0)
778 : {
779 12 : if (dctype)
780 0 : errorConflictingDefElem(defel, pstate);
781 12 : dctype = defel;
782 : }
783 636 : else if (strcmp(defel->defname, "icu_locale") == 0)
784 : {
785 10 : if (diculocale)
786 0 : errorConflictingDefElem(defel, pstate);
787 10 : diculocale = defel;
788 : }
789 626 : else if (strcmp(defel->defname, "icu_rules") == 0)
790 : {
791 2 : if (dicurules)
792 0 : errorConflictingDefElem(defel, pstate);
793 2 : dicurules = defel;
794 : }
795 624 : else if (strcmp(defel->defname, "locale_provider") == 0)
796 : {
797 76 : if (dlocprovider)
798 0 : errorConflictingDefElem(defel, pstate);
799 76 : dlocprovider = defel;
800 : }
801 548 : else if (strcmp(defel->defname, "is_template") == 0)
802 : {
803 80 : if (distemplate)
804 0 : errorConflictingDefElem(defel, pstate);
805 80 : distemplate = defel;
806 : }
807 468 : else if (strcmp(defel->defname, "allow_connections") == 0)
808 : {
809 78 : if (dallowconnections)
810 0 : errorConflictingDefElem(defel, pstate);
811 78 : dallowconnections = defel;
812 : }
813 390 : else if (strcmp(defel->defname, "connection_limit") == 0)
814 : {
815 0 : if (dconnlimit)
816 0 : errorConflictingDefElem(defel, pstate);
817 0 : dconnlimit = defel;
818 : }
819 390 : else if (strcmp(defel->defname, "collation_version") == 0)
820 : {
821 20 : if (dcollversion)
822 0 : errorConflictingDefElem(defel, pstate);
823 20 : dcollversion = defel;
824 : }
825 370 : else if (strcmp(defel->defname, "location") == 0)
826 : {
827 0 : ereport(WARNING,
828 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
829 : errmsg("LOCATION is not supported anymore"),
830 : errhint("Consider using tablespaces instead."),
831 : parser_errposition(pstate, defel->location)));
832 : }
833 370 : else if (strcmp(defel->defname, "oid") == 0)
834 : {
835 182 : dboid = defGetObjectId(defel);
836 :
837 : /*
838 : * We don't normally permit new databases to be created with
839 : * system-assigned OIDs. pg_upgrade tries to preserve database
840 : * OIDs, so we can't allow any database to be created with an OID
841 : * that might be in use in a freshly-initialized cluster created
842 : * by some future version. We assume all such OIDs will be from
843 : * the system-managed OID range.
844 : *
845 : * As an exception, however, we permit any OID to be assigned when
846 : * allow_system_table_mods=on (so that initdb can assign system
847 : * OIDs to template0 and postgres) or when performing a binary
848 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
849 : * in the source cluster).
850 : */
851 182 : if (dboid < FirstNormalObjectId &&
852 164 : !allowSystemTableMods && !IsBinaryUpgrade)
853 0 : ereport(ERROR,
854 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
855 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
856 : }
857 188 : else if (strcmp(defel->defname, "strategy") == 0)
858 : {
859 188 : if (dstrategy)
860 0 : errorConflictingDefElem(defel, pstate);
861 188 : dstrategy = defel;
862 : }
863 : else
864 0 : ereport(ERROR,
865 : (errcode(ERRCODE_SYNTAX_ERROR),
866 : errmsg("option \"%s\" not recognized", defel->defname),
867 : parser_errposition(pstate, defel->location)));
868 : }
869 :
870 640 : if (downer && downer->arg)
871 2 : dbowner = defGetString(downer);
872 640 : if (dtemplate && dtemplate->arg)
873 280 : dbtemplate = defGetString(dtemplate);
874 640 : if (dencoding && dencoding->arg)
875 : {
876 : const char *encoding_name;
877 :
878 60 : if (IsA(dencoding->arg, Integer))
879 : {
880 0 : encoding = defGetInt32(dencoding);
881 0 : encoding_name = pg_encoding_to_char(encoding);
882 0 : if (strcmp(encoding_name, "") == 0 ||
883 0 : pg_valid_server_encoding(encoding_name) < 0)
884 0 : ereport(ERROR,
885 : (errcode(ERRCODE_UNDEFINED_OBJECT),
886 : errmsg("%d is not a valid encoding code",
887 : encoding),
888 : parser_errposition(pstate, dencoding->location)));
889 : }
890 : else
891 : {
892 60 : encoding_name = defGetString(dencoding);
893 60 : encoding = pg_valid_server_encoding(encoding_name);
894 60 : if (encoding < 0)
895 0 : ereport(ERROR,
896 : (errcode(ERRCODE_UNDEFINED_OBJECT),
897 : errmsg("%s is not a valid encoding name",
898 : encoding_name),
899 : parser_errposition(pstate, dencoding->location)));
900 : }
901 : }
902 640 : if (dlocale && dlocale->arg)
903 : {
904 70 : dbcollate = defGetString(dlocale);
905 70 : dbctype = defGetString(dlocale);
906 70 : dblocale = defGetString(dlocale);
907 : }
908 640 : if (dbuiltinlocale && dbuiltinlocale->arg)
909 16 : dblocale = defGetString(dbuiltinlocale);
910 640 : if (dcollate && dcollate->arg)
911 12 : dbcollate = defGetString(dcollate);
912 640 : if (dctype && dctype->arg)
913 12 : dbctype = defGetString(dctype);
914 640 : if (diculocale && diculocale->arg)
915 10 : dblocale = defGetString(diculocale);
916 640 : if (dicurules && dicurules->arg)
917 2 : dbicurules = defGetString(dicurules);
918 640 : if (dlocprovider && dlocprovider->arg)
919 : {
920 76 : char *locproviderstr = defGetString(dlocprovider);
921 :
922 76 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
923 30 : dblocprovider = COLLPROVIDER_BUILTIN;
924 46 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
925 16 : dblocprovider = COLLPROVIDER_ICU;
926 30 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
927 28 : dblocprovider = COLLPROVIDER_LIBC;
928 : else
929 2 : ereport(ERROR,
930 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
931 : errmsg("unrecognized locale provider: %s",
932 : locproviderstr)));
933 : }
934 638 : if (distemplate && distemplate->arg)
935 80 : dbistemplate = defGetBoolean(distemplate);
936 638 : if (dallowconnections && dallowconnections->arg)
937 78 : dballowconnections = defGetBoolean(dallowconnections);
938 638 : if (dconnlimit && dconnlimit->arg)
939 : {
940 0 : dbconnlimit = defGetInt32(dconnlimit);
941 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
942 0 : ereport(ERROR,
943 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
944 : errmsg("invalid connection limit: %d", dbconnlimit)));
945 : }
946 638 : if (dcollversion)
947 20 : dbcollversion = defGetString(dcollversion);
948 :
949 : /* obtain OID of proposed owner */
950 638 : if (dbowner)
951 2 : datdba = get_role_oid(dbowner, false);
952 : else
953 636 : datdba = GetUserId();
954 :
955 : /*
956 : * To create a database, must have createdb privilege and must be able to
957 : * become the target role (this does not imply that the target role itself
958 : * must have createdb privilege). The latter provision guards against
959 : * "giveaway" attacks. Note that a superuser will always have both of
960 : * these privileges a fortiori.
961 : */
962 638 : if (!have_createdb_privilege())
963 6 : ereport(ERROR,
964 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
965 : errmsg("permission denied to create database")));
966 :
967 632 : check_can_set_role(GetUserId(), datdba);
968 :
969 : /*
970 : * Lookup database (template) to be cloned, and obtain share lock on it.
971 : * ShareLock allows two CREATE DATABASEs to work from the same template
972 : * concurrently, while ensuring no one is busy dropping it in parallel
973 : * (which would be Very Bad since we'd likely get an incomplete copy
974 : * without knowing it). This also prevents any new connections from being
975 : * made to the source until we finish copying it, so we can be sure it
976 : * won't change underneath us.
977 : */
978 632 : if (!dbtemplate)
979 354 : dbtemplate = "template1"; /* Default template database name */
980 :
981 632 : if (!get_db_info(dbtemplate, ShareLock,
982 : &src_dboid, &src_owner, &src_encoding,
983 : &src_istemplate, &src_allowconn, &src_hasloginevt,
984 : &src_frozenxid, &src_minmxid, &src_deftablespace,
985 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
986 : &src_collversion))
987 0 : ereport(ERROR,
988 : (errcode(ERRCODE_UNDEFINED_DATABASE),
989 : errmsg("template database \"%s\" does not exist",
990 : dbtemplate)));
991 :
992 : /*
993 : * If the source database was in the process of being dropped, we can't
994 : * use it as a template.
995 : */
996 632 : if (database_is_invalid_oid(src_dboid))
997 2 : ereport(ERROR,
998 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
999 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1000 : errhint("Use DROP DATABASE to drop invalid databases."));
1001 :
1002 : /*
1003 : * Permission check: to copy a DB that's not marked datistemplate, you
1004 : * must be superuser or the owner thereof.
1005 : */
1006 630 : if (!src_istemplate)
1007 : {
1008 16 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1009 0 : ereport(ERROR,
1010 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1011 : errmsg("permission denied to copy database \"%s\"",
1012 : dbtemplate)));
1013 : }
1014 :
1015 : /* Validate the database creation strategy. */
1016 630 : if (dstrategy && dstrategy->arg)
1017 : {
1018 : char *strategy;
1019 :
1020 188 : strategy = defGetString(dstrategy);
1021 188 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1022 14 : dbstrategy = CREATEDB_WAL_LOG;
1023 174 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1024 172 : dbstrategy = CREATEDB_FILE_COPY;
1025 : else
1026 2 : ereport(ERROR,
1027 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1028 : errmsg("invalid create database strategy \"%s\"", strategy),
1029 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1030 : }
1031 :
1032 : /* If encoding or locales are defaulted, use source's setting */
1033 628 : if (encoding < 0)
1034 568 : encoding = src_encoding;
1035 628 : if (dbcollate == NULL)
1036 552 : dbcollate = src_collate;
1037 628 : if (dbctype == NULL)
1038 552 : dbctype = src_ctype;
1039 628 : if (dblocprovider == '\0')
1040 554 : dblocprovider = src_locprovider;
1041 628 : if (dblocale == NULL)
1042 550 : dblocale = src_locale;
1043 628 : if (dbicurules == NULL)
1044 626 : dbicurules = src_icurules;
1045 :
1046 : /* Some encodings are client only */
1047 628 : if (!PG_VALID_BE_ENCODING(encoding))
1048 0 : ereport(ERROR,
1049 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1050 : errmsg("invalid server encoding %d", encoding)));
1051 :
1052 : /* Check that the chosen locales are valid, and get canonical spellings */
1053 628 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1054 2 : ereport(ERROR,
1055 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1056 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1057 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1058 626 : dbcollate = canonname;
1059 626 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1060 2 : ereport(ERROR,
1061 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1062 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1063 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1064 624 : dbctype = canonname;
1065 :
1066 624 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1067 :
1068 : /* validate provider-specific parameters */
1069 624 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1070 : {
1071 568 : if (dbuiltinlocale)
1072 0 : ereport(ERROR,
1073 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1074 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1075 : }
1076 :
1077 624 : if (dblocprovider != COLLPROVIDER_ICU)
1078 : {
1079 594 : if (diculocale)
1080 2 : ereport(ERROR,
1081 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1082 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1083 :
1084 592 : if (dbicurules)
1085 2 : ereport(ERROR,
1086 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1087 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1088 : }
1089 :
1090 : /* validate and canonicalize locale for the provider */
1091 620 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1092 : {
1093 : /*
1094 : * This would happen if template0 uses the libc provider but the new
1095 : * database uses builtin.
1096 : */
1097 52 : if (!dblocale)
1098 2 : ereport(ERROR,
1099 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1100 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1101 :
1102 50 : dblocale = builtin_validate_locale(encoding, dblocale);
1103 : }
1104 568 : else if (dblocprovider == COLLPROVIDER_ICU)
1105 : {
1106 30 : if (!(is_encoding_supported_by_icu(encoding)))
1107 2 : ereport(ERROR,
1108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1109 : errmsg("encoding \"%s\" is not supported with ICU provider",
1110 : pg_encoding_to_char(encoding))));
1111 :
1112 : /*
1113 : * This would happen if template0 uses the libc provider but the new
1114 : * database uses icu.
1115 : */
1116 28 : if (!dblocale)
1117 2 : ereport(ERROR,
1118 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1119 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1120 :
1121 : /*
1122 : * During binary upgrade, or when the locale came from the template
1123 : * database, preserve locale string. Otherwise, canonicalize to a
1124 : * language tag.
1125 : */
1126 26 : if (!IsBinaryUpgrade && dblocale != src_locale)
1127 : {
1128 14 : char *langtag = icu_language_tag(dblocale,
1129 : icu_validation_level);
1130 :
1131 14 : if (langtag && strcmp(dblocale, langtag) != 0)
1132 : {
1133 6 : ereport(NOTICE,
1134 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1135 : langtag, dblocale)));
1136 :
1137 6 : dblocale = langtag;
1138 : }
1139 : }
1140 :
1141 26 : icu_validate_locale(dblocale);
1142 : }
1143 :
1144 : /* for libc, locale comes from datcollate and datctype */
1145 610 : if (dblocprovider == COLLPROVIDER_LIBC)
1146 538 : dblocale = NULL;
1147 :
1148 : /*
1149 : * Check that the new encoding and locale settings match the source
1150 : * database. We insist on this because we simply copy the source data ---
1151 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1152 : * according to the source locale would be wrong.
1153 : *
1154 : * However, we assume that template0 doesn't contain any non-ASCII data
1155 : * nor any indexes that depend on collation or ctype, so template0 can be
1156 : * used as template for creating a database with any encoding or locale.
1157 : */
1158 610 : if (strcmp(dbtemplate, "template0") != 0)
1159 : {
1160 372 : if (encoding != src_encoding)
1161 0 : ereport(ERROR,
1162 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1163 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1164 : pg_encoding_to_char(encoding),
1165 : pg_encoding_to_char(src_encoding)),
1166 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1167 :
1168 372 : if (strcmp(dbcollate, src_collate) != 0)
1169 2 : ereport(ERROR,
1170 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1171 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1172 : dbcollate, src_collate),
1173 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1174 :
1175 370 : if (strcmp(dbctype, src_ctype) != 0)
1176 0 : ereport(ERROR,
1177 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1178 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1179 : dbctype, src_ctype),
1180 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1181 :
1182 370 : if (dblocprovider != src_locprovider)
1183 0 : ereport(ERROR,
1184 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1185 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1186 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1187 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1188 :
1189 370 : if (dblocprovider == COLLPROVIDER_ICU)
1190 : {
1191 : char *val1;
1192 : char *val2;
1193 :
1194 : Assert(dblocale);
1195 : Assert(src_locale);
1196 12 : if (strcmp(dblocale, src_locale) != 0)
1197 0 : ereport(ERROR,
1198 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1199 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1200 : dblocale, src_locale),
1201 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1202 :
1203 12 : val1 = dbicurules;
1204 12 : if (!val1)
1205 12 : val1 = "";
1206 12 : val2 = src_icurules;
1207 12 : if (!val2)
1208 12 : val2 = "";
1209 12 : if (strcmp(val1, val2) != 0)
1210 0 : ereport(ERROR,
1211 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1212 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1213 : val1, val2),
1214 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1215 : }
1216 : }
1217 :
1218 : /*
1219 : * If we got a collation version for the template database, check that it
1220 : * matches the actual OS collation version. Otherwise error; the user
1221 : * needs to fix the template database first. Don't complain if a
1222 : * collation version was specified explicitly as a statement option; that
1223 : * is used by pg_upgrade to reproduce the old state exactly.
1224 : *
1225 : * (If the template database has no collation version, then either the
1226 : * platform/provider does not support collation versioning, or it's
1227 : * template0, for which we stipulate that it does not contain
1228 : * collation-using objects.)
1229 : */
1230 608 : if (src_collversion && !dcollversion)
1231 : {
1232 : char *actual_versionstr;
1233 : const char *locale;
1234 :
1235 232 : if (dblocprovider == COLLPROVIDER_LIBC)
1236 210 : locale = dbcollate;
1237 : else
1238 22 : locale = dblocale;
1239 :
1240 232 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1241 232 : if (!actual_versionstr)
1242 0 : ereport(ERROR,
1243 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1244 : dbtemplate)));
1245 :
1246 232 : if (strcmp(actual_versionstr, src_collversion) != 0)
1247 0 : ereport(ERROR,
1248 : (errmsg("template database \"%s\" has a collation version mismatch",
1249 : dbtemplate),
1250 : errdetail("The template database was created using collation version %s, "
1251 : "but the operating system provides version %s.",
1252 : src_collversion, actual_versionstr),
1253 : errhint("Rebuild all objects in the template database that use the default collation and run "
1254 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1255 : "or build PostgreSQL with the right library version.",
1256 : quote_identifier(dbtemplate))));
1257 : }
1258 :
1259 608 : if (dbcollversion == NULL)
1260 588 : dbcollversion = src_collversion;
1261 :
1262 : /*
1263 : * Normally, we copy the collation version from the template database.
1264 : * This last resort only applies if the template database does not have a
1265 : * collation version, which is normally only the case for template0.
1266 : */
1267 608 : if (dbcollversion == NULL)
1268 : {
1269 : const char *locale;
1270 :
1271 356 : if (dblocprovider == COLLPROVIDER_LIBC)
1272 320 : locale = dbcollate;
1273 : else
1274 36 : locale = dblocale;
1275 :
1276 356 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1277 : }
1278 :
1279 : /* Resolve default tablespace for new database */
1280 608 : if (dtablespacename && dtablespacename->arg)
1281 16 : {
1282 : char *tablespacename;
1283 : AclResult aclresult;
1284 :
1285 16 : tablespacename = defGetString(dtablespacename);
1286 16 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1287 : /* check permissions */
1288 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1289 : ACL_CREATE);
1290 16 : if (aclresult != ACLCHECK_OK)
1291 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1292 : tablespacename);
1293 :
1294 : /* pg_global must never be the default tablespace */
1295 16 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1296 0 : ereport(ERROR,
1297 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1298 : errmsg("pg_global cannot be used as default tablespace")));
1299 :
1300 : /*
1301 : * If we are trying to change the default tablespace of the template,
1302 : * we require that the template not have any files in the new default
1303 : * tablespace. This is necessary because otherwise the copied
1304 : * database would contain pg_class rows that refer to its default
1305 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1306 : * would cause problems. For example another CREATE DATABASE using
1307 : * the copied database as template, and trying to change its default
1308 : * tablespace again, would yield outright incorrect results (it would
1309 : * improperly move tables to the new default tablespace that should
1310 : * stay in the same tablespace).
1311 : */
1312 16 : if (dst_deftablespace != src_deftablespace)
1313 : {
1314 : char *srcpath;
1315 : struct stat st;
1316 :
1317 16 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1318 :
1319 16 : if (stat(srcpath, &st) == 0 &&
1320 0 : S_ISDIR(st.st_mode) &&
1321 0 : !directory_is_empty(srcpath))
1322 0 : ereport(ERROR,
1323 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1324 : errmsg("cannot assign new default tablespace \"%s\"",
1325 : tablespacename),
1326 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1327 : dbtemplate)));
1328 16 : pfree(srcpath);
1329 : }
1330 : }
1331 : else
1332 : {
1333 : /* Use template database's default tablespace */
1334 592 : dst_deftablespace = src_deftablespace;
1335 : /* Note there is no additional permission check in this path */
1336 : }
1337 :
1338 : /*
1339 : * If built with appropriate switch, whine when regression-testing
1340 : * conventions for database names are violated. But don't complain during
1341 : * initdb.
1342 : */
1343 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1344 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1345 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1346 : #endif
1347 :
1348 : /*
1349 : * Check for db name conflict. This is just to give a more friendly error
1350 : * message than "unique index violation". There's a race condition but
1351 : * we're willing to accept the less friendly message in that case.
1352 : */
1353 608 : if (OidIsValid(get_database_oid(dbname, true)))
1354 2 : ereport(ERROR,
1355 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1356 : errmsg("database \"%s\" already exists", dbname)));
1357 :
1358 : /*
1359 : * The source DB can't have any active backends, except this one
1360 : * (exception is to allow CREATE DB while connected to template1).
1361 : * Otherwise we might copy inconsistent data.
1362 : *
1363 : * This should be last among the basic error checks, because it involves
1364 : * potential waiting; we may as well throw an error first if we're gonna
1365 : * throw one.
1366 : */
1367 606 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1368 0 : ereport(ERROR,
1369 : (errcode(ERRCODE_OBJECT_IN_USE),
1370 : errmsg("source database \"%s\" is being accessed by other users",
1371 : dbtemplate),
1372 : errdetail_busy_db(notherbackends, npreparedxacts)));
1373 :
1374 : /*
1375 : * Select an OID for the new database, checking that it doesn't have a
1376 : * filename conflict with anything already existing in the tablespace
1377 : * directories.
1378 : */
1379 606 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1380 :
1381 : /*
1382 : * If database OID is configured, check if the OID is already in use or
1383 : * data directory already exists.
1384 : */
1385 606 : if (OidIsValid(dboid))
1386 : {
1387 182 : char *existing_dbname = get_database_name(dboid);
1388 :
1389 182 : if (existing_dbname != NULL)
1390 0 : ereport(ERROR,
1391 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1392 : errmsg("database OID %u is already in use by database \"%s\"",
1393 : dboid, existing_dbname));
1394 :
1395 182 : if (check_db_file_conflict(dboid))
1396 0 : ereport(ERROR,
1397 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1398 : errmsg("data directory with the specified OID %u already exists", dboid));
1399 : }
1400 : else
1401 : {
1402 : /* Select an OID for the new database if is not explicitly configured. */
1403 : do
1404 : {
1405 424 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1406 : Anum_pg_database_oid);
1407 424 : } while (check_db_file_conflict(dboid));
1408 : }
1409 :
1410 : /*
1411 : * Insert a new tuple into pg_database. This establishes our ownership of
1412 : * the new database name (anyone else trying to insert the same name will
1413 : * block on the unique index, and fail after we commit).
1414 : */
1415 :
1416 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1417 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1418 :
1419 : /* Form tuple */
1420 606 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1421 606 : new_record[Anum_pg_database_datname - 1] =
1422 606 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1423 606 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1424 606 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1425 606 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1426 606 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1427 606 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1428 606 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1429 606 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1430 606 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1431 606 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1432 606 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1433 606 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1434 606 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1435 606 : if (dblocale)
1436 70 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1437 : else
1438 536 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1439 606 : if (dbicurules)
1440 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1441 : else
1442 606 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1443 606 : if (dbcollversion)
1444 490 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1445 : else
1446 116 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1447 :
1448 : /*
1449 : * We deliberately set datacl to default (NULL), rather than copying it
1450 : * from the template database. Copying it would be a bad idea when the
1451 : * owner is not the same as the template's owner.
1452 : */
1453 606 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1454 :
1455 606 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1456 : new_record, new_record_nulls);
1457 :
1458 606 : CatalogTupleInsert(pg_database_rel, tuple);
1459 :
1460 : /*
1461 : * Now generate additional catalog entries associated with the new DB
1462 : */
1463 :
1464 : /* Register owner dependency */
1465 606 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1466 :
1467 : /* Create pg_shdepend entries for objects within database */
1468 606 : copyTemplateDependencies(src_dboid, dboid);
1469 :
1470 : /* Post creation hook for new database */
1471 606 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1472 :
1473 : /*
1474 : * If we're going to be reading data for the to-be-created database into
1475 : * shared_buffers, take a lock on it. Nobody should know that this
1476 : * database exists yet, but it's good to maintain the invariant that an
1477 : * AccessExclusiveLock on the database is sufficient to drop all of its
1478 : * buffers without worrying about more being read later.
1479 : *
1480 : * Note that we need to do this before entering the
1481 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1482 : * expects this lock to be held already.
1483 : */
1484 606 : if (dbstrategy == CREATEDB_WAL_LOG)
1485 434 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1486 :
1487 : /*
1488 : * Once we start copying subdirectories, we need to be able to clean 'em
1489 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1490 : * is not a 100% solution, because of the possibility of failure during
1491 : * transaction commit after we leave this routine, but it should handle
1492 : * most scenarios.)
1493 : */
1494 606 : fparms.src_dboid = src_dboid;
1495 606 : fparms.dest_dboid = dboid;
1496 606 : fparms.strategy = dbstrategy;
1497 :
1498 606 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1499 : PointerGetDatum(&fparms));
1500 : {
1501 : /*
1502 : * If the user has asked to create a database with WAL_LOG strategy
1503 : * then call CreateDatabaseUsingWalLog, which will copy the database
1504 : * at the block level and it will WAL log each copied block.
1505 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1506 : * database file by file.
1507 : */
1508 606 : if (dbstrategy == CREATEDB_WAL_LOG)
1509 434 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1510 : dst_deftablespace);
1511 : else
1512 172 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1513 : dst_deftablespace);
1514 :
1515 : /*
1516 : * Close pg_database, but keep lock till commit.
1517 : */
1518 606 : table_close(pg_database_rel, NoLock);
1519 :
1520 : /*
1521 : * Force synchronous commit, thus minimizing the window between
1522 : * creation of the database files and committal of the transaction. If
1523 : * we crash before committing, we'll have a DB that's taking up disk
1524 : * space but is not in pg_database, which is not good.
1525 : */
1526 606 : ForceSyncCommit();
1527 : }
1528 606 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1529 : PointerGetDatum(&fparms));
1530 :
1531 606 : return dboid;
1532 : }
1533 :
1534 : /*
1535 : * Check whether chosen encoding matches chosen locale settings. This
1536 : * restriction is necessary because libc's locale-specific code usually
1537 : * fails when presented with data in an encoding it's not expecting. We
1538 : * allow mismatch in four cases:
1539 : *
1540 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1541 : * which works with any encoding.
1542 : *
1543 : * 2. locale encoding = -1, which means that we couldn't determine the
1544 : * locale's encoding and have to trust the user to get it right.
1545 : *
1546 : * 3. selected encoding is UTF8 and platform is win32. This is because
1547 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1548 : * converted to UTF16 before being used.
1549 : *
1550 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1551 : * is risky but we have historically allowed it --- notably, the
1552 : * regression tests require it.
1553 : *
1554 : * Note: if you change this policy, fix initdb to match.
1555 : */
1556 : void
1557 652 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1558 : {
1559 652 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1560 652 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1561 :
1562 658 : if (!(ctype_encoding == encoding ||
1563 6 : ctype_encoding == PG_SQL_ASCII ||
1564 : ctype_encoding == -1 ||
1565 : #ifdef WIN32
1566 : encoding == PG_UTF8 ||
1567 : #endif
1568 6 : (encoding == PG_SQL_ASCII && superuser())))
1569 0 : ereport(ERROR,
1570 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1571 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1572 : pg_encoding_to_char(encoding),
1573 : ctype),
1574 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1575 : pg_encoding_to_char(ctype_encoding))));
1576 :
1577 658 : if (!(collate_encoding == encoding ||
1578 6 : collate_encoding == PG_SQL_ASCII ||
1579 : collate_encoding == -1 ||
1580 : #ifdef WIN32
1581 : encoding == PG_UTF8 ||
1582 : #endif
1583 6 : (encoding == PG_SQL_ASCII && superuser())))
1584 0 : ereport(ERROR,
1585 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1586 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1587 : pg_encoding_to_char(encoding),
1588 : collate),
1589 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1590 : pg_encoding_to_char(collate_encoding))));
1591 652 : }
1592 :
1593 : /* Error cleanup callback for createdb */
1594 : static void
1595 0 : createdb_failure_callback(int code, Datum arg)
1596 : {
1597 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1598 :
1599 : /*
1600 : * If we were copying database at block levels then drop pages for the
1601 : * destination database that are in the shared buffer cache. And tell
1602 : * checkpointer to forget any pending fsync and unlink requests for files
1603 : * in the database. The reasoning behind doing this is same as explained
1604 : * in dropdb function. But unlike dropdb we don't need to call
1605 : * pgstat_drop_database because this database is still not created so
1606 : * there should not be any stat for this.
1607 : */
1608 0 : if (fparms->strategy == CREATEDB_WAL_LOG)
1609 : {
1610 0 : DropDatabaseBuffers(fparms->dest_dboid);
1611 0 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1612 :
1613 : /* Release lock on the target database. */
1614 0 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1615 : AccessShareLock);
1616 : }
1617 :
1618 : /*
1619 : * Release lock on source database before doing recursive remove. This is
1620 : * not essential but it seems desirable to release the lock as soon as
1621 : * possible.
1622 : */
1623 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1624 :
1625 : /* Throw away any successfully copied subdirectories */
1626 0 : remove_dbtablespaces(fparms->dest_dboid);
1627 0 : }
1628 :
1629 :
1630 : /*
1631 : * DROP DATABASE
1632 : */
1633 : void
1634 94 : dropdb(const char *dbname, bool missing_ok, bool force)
1635 : {
1636 : Oid db_id;
1637 : bool db_istemplate;
1638 : Relation pgdbrel;
1639 : HeapTuple tup;
1640 : Form_pg_database datform;
1641 : int notherbackends;
1642 : int npreparedxacts;
1643 : int nslots,
1644 : nslots_active;
1645 : int nsubscriptions;
1646 :
1647 : /*
1648 : * Look up the target database's OID, and get exclusive lock on it. We
1649 : * need this to ensure that no new backend starts up in the target
1650 : * database while we are deleting it (see postinit.c), and that no one is
1651 : * using it as a CREATE DATABASE template or trying to delete it for
1652 : * themselves.
1653 : */
1654 94 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1655 :
1656 94 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1657 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1658 : {
1659 32 : if (!missing_ok)
1660 : {
1661 16 : ereport(ERROR,
1662 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1663 : errmsg("database \"%s\" does not exist", dbname)));
1664 : }
1665 : else
1666 : {
1667 : /* Close pg_database, release the lock, since we changed nothing */
1668 16 : table_close(pgdbrel, RowExclusiveLock);
1669 16 : ereport(NOTICE,
1670 : (errmsg("database \"%s\" does not exist, skipping",
1671 : dbname)));
1672 16 : return;
1673 : }
1674 : }
1675 :
1676 : /*
1677 : * Permission checks
1678 : */
1679 62 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1680 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1681 : dbname);
1682 :
1683 : /* DROP hook for the database being removed */
1684 62 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1685 :
1686 : /*
1687 : * Disallow dropping a DB that is marked istemplate. This is just to
1688 : * prevent people from accidentally dropping template0 or template1; they
1689 : * can do so if they're really determined ...
1690 : */
1691 62 : if (db_istemplate)
1692 0 : ereport(ERROR,
1693 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1694 : errmsg("cannot drop a template database")));
1695 :
1696 : /* Obviously can't drop my own database */
1697 62 : if (db_id == MyDatabaseId)
1698 0 : ereport(ERROR,
1699 : (errcode(ERRCODE_OBJECT_IN_USE),
1700 : errmsg("cannot drop the currently open database")));
1701 :
1702 : /*
1703 : * Check whether there are active logical slots that refer to the
1704 : * to-be-dropped database. The database lock we are holding prevents the
1705 : * creation of new slots using the database or existing slots becoming
1706 : * active.
1707 : */
1708 62 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1709 62 : if (nslots_active)
1710 : {
1711 2 : ereport(ERROR,
1712 : (errcode(ERRCODE_OBJECT_IN_USE),
1713 : errmsg("database \"%s\" is used by an active logical replication slot",
1714 : dbname),
1715 : errdetail_plural("There is %d active slot.",
1716 : "There are %d active slots.",
1717 : nslots_active, nslots_active)));
1718 : }
1719 :
1720 : /*
1721 : * Check if there are subscriptions defined in the target database.
1722 : *
1723 : * We can't drop them automatically because they might be holding
1724 : * resources in other databases/instances.
1725 : */
1726 60 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1727 0 : ereport(ERROR,
1728 : (errcode(ERRCODE_OBJECT_IN_USE),
1729 : errmsg("database \"%s\" is being used by logical replication subscription",
1730 : dbname),
1731 : errdetail_plural("There is %d subscription.",
1732 : "There are %d subscriptions.",
1733 : nsubscriptions, nsubscriptions)));
1734 :
1735 :
1736 : /*
1737 : * Attempt to terminate all existing connections to the target database if
1738 : * the user has requested to do so.
1739 : */
1740 60 : if (force)
1741 2 : TerminateOtherDBBackends(db_id);
1742 :
1743 : /*
1744 : * Check for other backends in the target database. (Because we hold the
1745 : * database lock, no new ones can start after this.)
1746 : *
1747 : * As in CREATE DATABASE, check this after other error conditions.
1748 : */
1749 60 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1750 0 : ereport(ERROR,
1751 : (errcode(ERRCODE_OBJECT_IN_USE),
1752 : errmsg("database \"%s\" is being accessed by other users",
1753 : dbname),
1754 : errdetail_busy_db(notherbackends, npreparedxacts)));
1755 :
1756 : /*
1757 : * Delete any comments or security labels associated with the database.
1758 : */
1759 60 : DeleteSharedComments(db_id, DatabaseRelationId);
1760 60 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1761 :
1762 : /*
1763 : * Remove settings associated with this database
1764 : */
1765 60 : DropSetting(db_id, InvalidOid);
1766 :
1767 : /*
1768 : * Remove shared dependency references for the database.
1769 : */
1770 60 : dropDatabaseDependencies(db_id);
1771 :
1772 : /*
1773 : * Tell the cumulative stats system to forget it immediately, too.
1774 : */
1775 60 : pgstat_drop_database(db_id);
1776 :
1777 60 : tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1778 60 : if (!HeapTupleIsValid(tup))
1779 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1780 60 : datform = (Form_pg_database) GETSTRUCT(tup);
1781 :
1782 : /*
1783 : * Except for the deletion of the catalog row, subsequent actions are not
1784 : * transactional (consider DropDatabaseBuffers() discarding modified
1785 : * buffers). But we might crash or get interrupted below. To prevent
1786 : * accesses to a database with invalid contents, mark the database as
1787 : * invalid using an in-place update.
1788 : *
1789 : * We need to flush the WAL before continuing, to guarantee the
1790 : * modification is durable before performing irreversible filesystem
1791 : * operations.
1792 : */
1793 60 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1794 60 : heap_inplace_update(pgdbrel, tup);
1795 60 : XLogFlush(XactLastRecEnd);
1796 :
1797 : /*
1798 : * Also delete the tuple - transactionally. If this transaction commits,
1799 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1800 : */
1801 60 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1802 :
1803 : /*
1804 : * Drop db-specific replication slots.
1805 : */
1806 60 : ReplicationSlotsDropDBSlots(db_id);
1807 :
1808 : /*
1809 : * Drop pages for this database that are in the shared buffer cache. This
1810 : * is important to ensure that no remaining backend tries to write out a
1811 : * dirty buffer to the dead database later...
1812 : */
1813 60 : DropDatabaseBuffers(db_id);
1814 :
1815 : /*
1816 : * Tell checkpointer to forget any pending fsync and unlink requests for
1817 : * files in the database; else the fsyncs will fail at next checkpoint, or
1818 : * worse, it will delete files that belong to a newly created database
1819 : * with the same OID.
1820 : */
1821 60 : ForgetDatabaseSyncRequests(db_id);
1822 :
1823 : /*
1824 : * Force a checkpoint to make sure the checkpointer has received the
1825 : * message sent by ForgetDatabaseSyncRequests.
1826 : */
1827 60 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1828 :
1829 : /* Close all smgr fds in all backends. */
1830 60 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1831 :
1832 : /*
1833 : * Remove all tablespace subdirs belonging to the database.
1834 : */
1835 60 : remove_dbtablespaces(db_id);
1836 :
1837 : /*
1838 : * Close pg_database, but keep lock till commit.
1839 : */
1840 60 : table_close(pgdbrel, NoLock);
1841 :
1842 : /*
1843 : * Force synchronous commit, thus minimizing the window between removal of
1844 : * the database files and committal of the transaction. If we crash before
1845 : * committing, we'll have a DB that's gone on disk but still there
1846 : * according to pg_database, which is not good.
1847 : */
1848 60 : ForceSyncCommit();
1849 : }
1850 :
1851 :
1852 : /*
1853 : * Rename database
1854 : */
1855 : ObjectAddress
1856 0 : RenameDatabase(const char *oldname, const char *newname)
1857 : {
1858 : Oid db_id;
1859 : HeapTuple newtup;
1860 : Relation rel;
1861 : int notherbackends;
1862 : int npreparedxacts;
1863 : ObjectAddress address;
1864 :
1865 : /*
1866 : * Look up the target database's OID, and get exclusive lock on it. We
1867 : * need this for the same reasons as DROP DATABASE.
1868 : */
1869 0 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1870 :
1871 0 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1872 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1873 0 : ereport(ERROR,
1874 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1875 : errmsg("database \"%s\" does not exist", oldname)));
1876 :
1877 : /* must be owner */
1878 0 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1879 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1880 : oldname);
1881 :
1882 : /* must have createdb rights */
1883 0 : if (!have_createdb_privilege())
1884 0 : ereport(ERROR,
1885 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1886 : errmsg("permission denied to rename database")));
1887 :
1888 : /*
1889 : * If built with appropriate switch, whine when regression-testing
1890 : * conventions for database names are violated.
1891 : */
1892 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1893 : if (strstr(newname, "regression") == NULL)
1894 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1895 : #endif
1896 :
1897 : /*
1898 : * Make sure the new name doesn't exist. See notes for same error in
1899 : * CREATE DATABASE.
1900 : */
1901 0 : if (OidIsValid(get_database_oid(newname, true)))
1902 0 : ereport(ERROR,
1903 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1904 : errmsg("database \"%s\" already exists", newname)));
1905 :
1906 : /*
1907 : * XXX Client applications probably store the current database somewhere,
1908 : * so renaming it could cause confusion. On the other hand, there may not
1909 : * be an actual problem besides a little confusion, so think about this
1910 : * and decide.
1911 : */
1912 0 : if (db_id == MyDatabaseId)
1913 0 : ereport(ERROR,
1914 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1915 : errmsg("current database cannot be renamed")));
1916 :
1917 : /*
1918 : * Make sure the database does not have active sessions. This is the same
1919 : * concern as above, but applied to other sessions.
1920 : *
1921 : * As in CREATE DATABASE, check this after other error conditions.
1922 : */
1923 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1924 0 : ereport(ERROR,
1925 : (errcode(ERRCODE_OBJECT_IN_USE),
1926 : errmsg("database \"%s\" is being accessed by other users",
1927 : oldname),
1928 : errdetail_busy_db(notherbackends, npreparedxacts)));
1929 :
1930 : /* rename */
1931 0 : newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1932 0 : if (!HeapTupleIsValid(newtup))
1933 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1934 0 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1935 0 : CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1936 :
1937 0 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1938 :
1939 0 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1940 :
1941 : /*
1942 : * Close pg_database, but keep lock till commit.
1943 : */
1944 0 : table_close(rel, NoLock);
1945 :
1946 0 : return address;
1947 : }
1948 :
1949 :
1950 : /*
1951 : * ALTER DATABASE SET TABLESPACE
1952 : */
1953 : static void
1954 10 : movedb(const char *dbname, const char *tblspcname)
1955 : {
1956 : Oid db_id;
1957 : Relation pgdbrel;
1958 : int notherbackends;
1959 : int npreparedxacts;
1960 : HeapTuple oldtuple,
1961 : newtuple;
1962 : Oid src_tblspcoid,
1963 : dst_tblspcoid;
1964 : ScanKeyData scankey;
1965 : SysScanDesc sysscan;
1966 : AclResult aclresult;
1967 : char *src_dbpath;
1968 : char *dst_dbpath;
1969 : DIR *dstdir;
1970 : struct dirent *xlde;
1971 : movedb_failure_params fparms;
1972 :
1973 : /*
1974 : * Look up the target database's OID, and get exclusive lock on it. We
1975 : * need this to ensure that no new backend starts up in the database while
1976 : * we are moving it, and that no one is using it as a CREATE DATABASE
1977 : * template or trying to delete it.
1978 : */
1979 10 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1980 :
1981 10 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1982 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
1983 0 : ereport(ERROR,
1984 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1985 : errmsg("database \"%s\" does not exist", dbname)));
1986 :
1987 : /*
1988 : * We actually need a session lock, so that the lock will persist across
1989 : * the commit/restart below. (We could almost get away with letting the
1990 : * lock be released at commit, except that someone could try to move
1991 : * relations of the DB back into the old directory while we rmtree() it.)
1992 : */
1993 10 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1994 : AccessExclusiveLock);
1995 :
1996 : /*
1997 : * Permission checks
1998 : */
1999 10 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2000 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2001 : dbname);
2002 :
2003 : /*
2004 : * Obviously can't move the tables of my own database
2005 : */
2006 10 : if (db_id == MyDatabaseId)
2007 0 : ereport(ERROR,
2008 : (errcode(ERRCODE_OBJECT_IN_USE),
2009 : errmsg("cannot change the tablespace of the currently open database")));
2010 :
2011 : /*
2012 : * Get tablespace's oid
2013 : */
2014 10 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2015 :
2016 : /*
2017 : * Permission checks
2018 : */
2019 10 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2020 : ACL_CREATE);
2021 10 : if (aclresult != ACLCHECK_OK)
2022 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2023 : tblspcname);
2024 :
2025 : /*
2026 : * pg_global must never be the default tablespace
2027 : */
2028 10 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2029 0 : ereport(ERROR,
2030 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2031 : errmsg("pg_global cannot be used as default tablespace")));
2032 :
2033 : /*
2034 : * No-op if same tablespace
2035 : */
2036 10 : if (src_tblspcoid == dst_tblspcoid)
2037 : {
2038 0 : table_close(pgdbrel, NoLock);
2039 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2040 : AccessExclusiveLock);
2041 0 : return;
2042 : }
2043 :
2044 : /*
2045 : * Check for other backends in the target database. (Because we hold the
2046 : * database lock, no new ones can start after this.)
2047 : *
2048 : * As in CREATE DATABASE, check this after other error conditions.
2049 : */
2050 10 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2051 0 : ereport(ERROR,
2052 : (errcode(ERRCODE_OBJECT_IN_USE),
2053 : errmsg("database \"%s\" is being accessed by other users",
2054 : dbname),
2055 : errdetail_busy_db(notherbackends, npreparedxacts)));
2056 :
2057 : /*
2058 : * Get old and new database paths
2059 : */
2060 10 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2061 10 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2062 :
2063 : /*
2064 : * Force a checkpoint before proceeding. This will force all dirty
2065 : * buffers, including those of unlogged tables, out to disk, to ensure
2066 : * source database is up-to-date on disk for the copy.
2067 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2068 : * process any pending unlink requests. Otherwise, the check for existing
2069 : * files in the target directory might fail unnecessarily, not to mention
2070 : * that the copy might fail due to source files getting deleted under it.
2071 : * On Windows, this also ensures that background procs don't hold any open
2072 : * files, which would cause rmdir() to fail.
2073 : */
2074 10 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2075 : | CHECKPOINT_FLUSH_ALL);
2076 :
2077 : /* Close all smgr fds in all backends. */
2078 10 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2079 :
2080 : /*
2081 : * Now drop all buffers holding data of the target database; they should
2082 : * no longer be dirty so DropDatabaseBuffers is safe.
2083 : *
2084 : * It might seem that we could just let these buffers age out of shared
2085 : * buffers naturally, since they should not get referenced anymore. The
2086 : * problem with that is that if the user later moves the database back to
2087 : * its original tablespace, any still-surviving buffers would appear to
2088 : * contain valid data again --- but they'd be missing any changes made in
2089 : * the database while it was in the new tablespace. In any case, freeing
2090 : * buffers that should never be used again seems worth the cycles.
2091 : *
2092 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2093 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2094 : */
2095 10 : DropDatabaseBuffers(db_id);
2096 :
2097 : /*
2098 : * Check for existence of files in the target directory, i.e., objects of
2099 : * this database that are already in the target tablespace. We can't
2100 : * allow the move in such a case, because we would need to change those
2101 : * relations' pg_class.reltablespace entries to zero, and we don't have
2102 : * access to the DB's pg_class to do so.
2103 : */
2104 10 : dstdir = AllocateDir(dst_dbpath);
2105 10 : if (dstdir != NULL)
2106 : {
2107 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2108 : {
2109 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2110 0 : strcmp(xlde->d_name, "..") == 0)
2111 0 : continue;
2112 :
2113 0 : ereport(ERROR,
2114 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2115 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2116 : dbname, tblspcname),
2117 : errhint("You must move them back to the database's default tablespace before using this command.")));
2118 : }
2119 :
2120 0 : FreeDir(dstdir);
2121 :
2122 : /*
2123 : * The directory exists but is empty. We must remove it before using
2124 : * the copydir function.
2125 : */
2126 0 : if (rmdir(dst_dbpath) != 0)
2127 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2128 : dst_dbpath);
2129 : }
2130 :
2131 : /*
2132 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2133 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2134 : * of the possibility of failure during transaction commit, but it should
2135 : * handle most scenarios.
2136 : */
2137 10 : fparms.dest_dboid = db_id;
2138 10 : fparms.dest_tsoid = dst_tblspcoid;
2139 10 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2140 : PointerGetDatum(&fparms));
2141 : {
2142 10 : Datum new_record[Natts_pg_database] = {0};
2143 10 : bool new_record_nulls[Natts_pg_database] = {0};
2144 10 : bool new_record_repl[Natts_pg_database] = {0};
2145 :
2146 : /*
2147 : * Copy files from the old tablespace to the new one
2148 : */
2149 10 : copydir(src_dbpath, dst_dbpath, false);
2150 :
2151 : /*
2152 : * Record the filesystem change in XLOG
2153 : */
2154 : {
2155 : xl_dbase_create_file_copy_rec xlrec;
2156 :
2157 10 : xlrec.db_id = db_id;
2158 10 : xlrec.tablespace_id = dst_tblspcoid;
2159 10 : xlrec.src_db_id = db_id;
2160 10 : xlrec.src_tablespace_id = src_tblspcoid;
2161 :
2162 10 : XLogBeginInsert();
2163 10 : XLogRegisterData((char *) &xlrec,
2164 : sizeof(xl_dbase_create_file_copy_rec));
2165 :
2166 10 : (void) XLogInsert(RM_DBASE_ID,
2167 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2168 : }
2169 :
2170 : /*
2171 : * Update the database's pg_database tuple
2172 : */
2173 10 : ScanKeyInit(&scankey,
2174 : Anum_pg_database_datname,
2175 : BTEqualStrategyNumber, F_NAMEEQ,
2176 : CStringGetDatum(dbname));
2177 10 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2178 : NULL, 1, &scankey);
2179 10 : oldtuple = systable_getnext(sysscan);
2180 10 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2181 0 : ereport(ERROR,
2182 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2183 : errmsg("database \"%s\" does not exist", dbname)));
2184 :
2185 10 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2186 10 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2187 :
2188 10 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2189 : new_record,
2190 : new_record_nulls, new_record_repl);
2191 10 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2192 :
2193 10 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2194 :
2195 10 : systable_endscan(sysscan);
2196 :
2197 : /*
2198 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2199 : * ensure that we don't have to replay a committed
2200 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2201 : * any unlogged operations done in the new DB tablespace before the
2202 : * next checkpoint.
2203 : */
2204 10 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2205 :
2206 : /*
2207 : * Force synchronous commit, thus minimizing the window between
2208 : * copying the database files and committal of the transaction. If we
2209 : * crash before committing, we'll leave an orphaned set of files on
2210 : * disk, which is not fatal but not good either.
2211 : */
2212 10 : ForceSyncCommit();
2213 :
2214 : /*
2215 : * Close pg_database, but keep lock till commit.
2216 : */
2217 10 : table_close(pgdbrel, NoLock);
2218 : }
2219 10 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2220 : PointerGetDatum(&fparms));
2221 :
2222 : /*
2223 : * Commit the transaction so that the pg_database update is committed. If
2224 : * we crash while removing files, the database won't be corrupt, we'll
2225 : * just leave some orphaned files in the old directory.
2226 : *
2227 : * (This is OK because we know we aren't inside a transaction block.)
2228 : *
2229 : * XXX would it be safe/better to do this inside the ensure block? Not
2230 : * convinced it's a good idea; consider elog just after the transaction
2231 : * really commits.
2232 : */
2233 10 : PopActiveSnapshot();
2234 10 : CommitTransactionCommand();
2235 :
2236 : /* Start new transaction for the remaining work; don't need a snapshot */
2237 10 : StartTransactionCommand();
2238 :
2239 : /*
2240 : * Remove files from the old tablespace
2241 : */
2242 10 : if (!rmtree(src_dbpath, true))
2243 0 : ereport(WARNING,
2244 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2245 : src_dbpath)));
2246 :
2247 : /*
2248 : * Record the filesystem change in XLOG
2249 : */
2250 : {
2251 : xl_dbase_drop_rec xlrec;
2252 :
2253 10 : xlrec.db_id = db_id;
2254 10 : xlrec.ntablespaces = 1;
2255 :
2256 10 : XLogBeginInsert();
2257 10 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
2258 10 : XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
2259 :
2260 10 : (void) XLogInsert(RM_DBASE_ID,
2261 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2262 : }
2263 :
2264 : /* Now it's safe to release the database lock */
2265 10 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2266 : AccessExclusiveLock);
2267 :
2268 10 : pfree(src_dbpath);
2269 10 : pfree(dst_dbpath);
2270 : }
2271 :
2272 : /* Error cleanup callback for movedb */
2273 : static void
2274 0 : movedb_failure_callback(int code, Datum arg)
2275 : {
2276 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2277 : char *dstpath;
2278 :
2279 : /* Get rid of anything we managed to copy to the target directory */
2280 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2281 :
2282 0 : (void) rmtree(dstpath, true);
2283 :
2284 0 : pfree(dstpath);
2285 0 : }
2286 :
2287 : /*
2288 : * Process options and call dropdb function.
2289 : */
2290 : void
2291 94 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2292 : {
2293 94 : bool force = false;
2294 : ListCell *lc;
2295 :
2296 120 : foreach(lc, stmt->options)
2297 : {
2298 26 : DefElem *opt = (DefElem *) lfirst(lc);
2299 :
2300 26 : if (strcmp(opt->defname, "force") == 0)
2301 26 : force = true;
2302 : else
2303 0 : ereport(ERROR,
2304 : (errcode(ERRCODE_SYNTAX_ERROR),
2305 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2306 : parser_errposition(pstate, opt->location)));
2307 : }
2308 :
2309 94 : dropdb(stmt->dbname, stmt->missing_ok, force);
2310 76 : }
2311 :
2312 : /*
2313 : * ALTER DATABASE name ...
2314 : */
2315 : Oid
2316 34 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2317 : {
2318 : Relation rel;
2319 : Oid dboid;
2320 : HeapTuple tuple,
2321 : newtuple;
2322 : Form_pg_database datform;
2323 : ScanKeyData scankey;
2324 : SysScanDesc scan;
2325 : ListCell *option;
2326 34 : bool dbistemplate = false;
2327 34 : bool dballowconnections = true;
2328 34 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2329 34 : DefElem *distemplate = NULL;
2330 34 : DefElem *dallowconnections = NULL;
2331 34 : DefElem *dconnlimit = NULL;
2332 34 : DefElem *dtablespace = NULL;
2333 34 : Datum new_record[Natts_pg_database] = {0};
2334 34 : bool new_record_nulls[Natts_pg_database] = {0};
2335 34 : bool new_record_repl[Natts_pg_database] = {0};
2336 :
2337 : /* Extract options from the statement node tree */
2338 68 : foreach(option, stmt->options)
2339 : {
2340 34 : DefElem *defel = (DefElem *) lfirst(option);
2341 :
2342 34 : if (strcmp(defel->defname, "is_template") == 0)
2343 : {
2344 10 : if (distemplate)
2345 0 : errorConflictingDefElem(defel, pstate);
2346 10 : distemplate = defel;
2347 : }
2348 24 : else if (strcmp(defel->defname, "allow_connections") == 0)
2349 : {
2350 12 : if (dallowconnections)
2351 0 : errorConflictingDefElem(defel, pstate);
2352 12 : dallowconnections = defel;
2353 : }
2354 12 : else if (strcmp(defel->defname, "connection_limit") == 0)
2355 : {
2356 2 : if (dconnlimit)
2357 0 : errorConflictingDefElem(defel, pstate);
2358 2 : dconnlimit = defel;
2359 : }
2360 10 : else if (strcmp(defel->defname, "tablespace") == 0)
2361 : {
2362 10 : if (dtablespace)
2363 0 : errorConflictingDefElem(defel, pstate);
2364 10 : dtablespace = defel;
2365 : }
2366 : else
2367 0 : ereport(ERROR,
2368 : (errcode(ERRCODE_SYNTAX_ERROR),
2369 : errmsg("option \"%s\" not recognized", defel->defname),
2370 : parser_errposition(pstate, defel->location)));
2371 : }
2372 :
2373 34 : if (dtablespace)
2374 : {
2375 : /*
2376 : * While the SET TABLESPACE syntax doesn't allow any other options,
2377 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2378 : * options from being specified in that case.
2379 : */
2380 10 : if (list_length(stmt->options) != 1)
2381 0 : ereport(ERROR,
2382 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2383 : errmsg("option \"%s\" cannot be specified with other options",
2384 : dtablespace->defname),
2385 : parser_errposition(pstate, dtablespace->location)));
2386 : /* this case isn't allowed within a transaction block */
2387 10 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2388 10 : movedb(stmt->dbname, defGetString(dtablespace));
2389 10 : return InvalidOid;
2390 : }
2391 :
2392 24 : if (distemplate && distemplate->arg)
2393 10 : dbistemplate = defGetBoolean(distemplate);
2394 24 : if (dallowconnections && dallowconnections->arg)
2395 12 : dballowconnections = defGetBoolean(dallowconnections);
2396 24 : if (dconnlimit && dconnlimit->arg)
2397 : {
2398 2 : dbconnlimit = defGetInt32(dconnlimit);
2399 2 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2400 0 : ereport(ERROR,
2401 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2402 : errmsg("invalid connection limit: %d", dbconnlimit)));
2403 : }
2404 :
2405 : /*
2406 : * Get the old tuple. We don't need a lock on the database per se,
2407 : * because we're not going to do anything that would mess up incoming
2408 : * connections.
2409 : */
2410 24 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2411 24 : ScanKeyInit(&scankey,
2412 : Anum_pg_database_datname,
2413 : BTEqualStrategyNumber, F_NAMEEQ,
2414 24 : CStringGetDatum(stmt->dbname));
2415 24 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2416 : NULL, 1, &scankey);
2417 24 : tuple = systable_getnext(scan);
2418 24 : if (!HeapTupleIsValid(tuple))
2419 0 : ereport(ERROR,
2420 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2421 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2422 :
2423 24 : datform = (Form_pg_database) GETSTRUCT(tuple);
2424 24 : dboid = datform->oid;
2425 :
2426 24 : if (database_is_invalid_form(datform))
2427 : {
2428 2 : ereport(FATAL,
2429 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2430 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2431 : errhint("Use DROP DATABASE to drop invalid databases."));
2432 : }
2433 :
2434 22 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2435 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2436 0 : stmt->dbname);
2437 :
2438 : /*
2439 : * In order to avoid getting locked out and having to go through
2440 : * standalone mode, we refuse to disallow connections to the database
2441 : * we're currently connected to. Lockout can still happen with concurrent
2442 : * sessions but the likeliness of that is not high enough to worry about.
2443 : */
2444 22 : if (!dballowconnections && dboid == MyDatabaseId)
2445 0 : ereport(ERROR,
2446 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2447 : errmsg("cannot disallow connections for current database")));
2448 :
2449 : /*
2450 : * Build an updated tuple, perusing the information just obtained
2451 : */
2452 22 : if (distemplate)
2453 : {
2454 10 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2455 10 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2456 : }
2457 22 : if (dallowconnections)
2458 : {
2459 12 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2460 12 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2461 : }
2462 22 : if (dconnlimit)
2463 : {
2464 0 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2465 0 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2466 : }
2467 :
2468 22 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2469 : new_record_nulls, new_record_repl);
2470 22 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2471 :
2472 22 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2473 :
2474 22 : systable_endscan(scan);
2475 :
2476 : /* Close pg_database, but keep lock till commit */
2477 22 : table_close(rel, NoLock);
2478 :
2479 22 : return dboid;
2480 : }
2481 :
2482 :
2483 : /*
2484 : * ALTER DATABASE name REFRESH COLLATION VERSION
2485 : */
2486 : ObjectAddress
2487 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2488 : {
2489 : Relation rel;
2490 : ScanKeyData scankey;
2491 : SysScanDesc scan;
2492 : Oid db_id;
2493 : HeapTuple tuple;
2494 : Form_pg_database datForm;
2495 : ObjectAddress address;
2496 : Datum datum;
2497 : bool isnull;
2498 : char *oldversion;
2499 : char *newversion;
2500 :
2501 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2502 6 : ScanKeyInit(&scankey,
2503 : Anum_pg_database_datname,
2504 : BTEqualStrategyNumber, F_NAMEEQ,
2505 6 : CStringGetDatum(stmt->dbname));
2506 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2507 : NULL, 1, &scankey);
2508 6 : tuple = systable_getnext(scan);
2509 6 : if (!HeapTupleIsValid(tuple))
2510 0 : ereport(ERROR,
2511 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2512 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2513 :
2514 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2515 6 : db_id = datForm->oid;
2516 :
2517 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2518 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2519 0 : stmt->dbname);
2520 :
2521 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2522 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2523 :
2524 6 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2525 : {
2526 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2527 4 : if (isnull)
2528 0 : elog(ERROR, "unexpected null in pg_database");
2529 : }
2530 : else
2531 : {
2532 2 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2533 2 : if (isnull)
2534 0 : elog(ERROR, "unexpected null in pg_database");
2535 : }
2536 :
2537 6 : newversion = get_collation_actual_version(datForm->datlocprovider,
2538 6 : TextDatumGetCString(datum));
2539 :
2540 : /* cannot change from NULL to non-NULL or vice versa */
2541 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
2542 0 : elog(ERROR, "invalid collation version change");
2543 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2544 0 : {
2545 0 : bool nulls[Natts_pg_database] = {0};
2546 0 : bool replaces[Natts_pg_database] = {0};
2547 0 : Datum values[Natts_pg_database] = {0};
2548 :
2549 0 : ereport(NOTICE,
2550 : (errmsg("changing version from %s to %s",
2551 : oldversion, newversion)));
2552 :
2553 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2554 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2555 :
2556 0 : tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2557 : values, nulls, replaces);
2558 0 : CatalogTupleUpdate(rel, &tuple->t_self, tuple);
2559 0 : heap_freetuple(tuple);
2560 : }
2561 : else
2562 6 : ereport(NOTICE,
2563 : (errmsg("version has not changed")));
2564 :
2565 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2566 :
2567 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2568 :
2569 6 : systable_endscan(scan);
2570 :
2571 6 : table_close(rel, NoLock);
2572 :
2573 6 : return address;
2574 : }
2575 :
2576 :
2577 : /*
2578 : * ALTER DATABASE name SET ...
2579 : */
2580 : Oid
2581 1058 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2582 : {
2583 1058 : Oid datid = get_database_oid(stmt->dbname, false);
2584 :
2585 : /*
2586 : * Obtain a lock on the database and make sure it didn't go away in the
2587 : * meantime.
2588 : */
2589 1058 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2590 :
2591 1058 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2592 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2593 0 : stmt->dbname);
2594 :
2595 1058 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2596 :
2597 1058 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2598 :
2599 1058 : return datid;
2600 : }
2601 :
2602 :
2603 : /*
2604 : * ALTER DATABASE name OWNER TO newowner
2605 : */
2606 : ObjectAddress
2607 44 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2608 : {
2609 : Oid db_id;
2610 : HeapTuple tuple;
2611 : Relation rel;
2612 : ScanKeyData scankey;
2613 : SysScanDesc scan;
2614 : Form_pg_database datForm;
2615 : ObjectAddress address;
2616 :
2617 : /*
2618 : * Get the old tuple. We don't need a lock on the database per se,
2619 : * because we're not going to do anything that would mess up incoming
2620 : * connections.
2621 : */
2622 44 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2623 44 : ScanKeyInit(&scankey,
2624 : Anum_pg_database_datname,
2625 : BTEqualStrategyNumber, F_NAMEEQ,
2626 : CStringGetDatum(dbname));
2627 44 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2628 : NULL, 1, &scankey);
2629 44 : tuple = systable_getnext(scan);
2630 44 : if (!HeapTupleIsValid(tuple))
2631 0 : ereport(ERROR,
2632 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2633 : errmsg("database \"%s\" does not exist", dbname)));
2634 :
2635 44 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2636 44 : db_id = datForm->oid;
2637 :
2638 : /*
2639 : * If the new owner is the same as the existing owner, consider the
2640 : * command to have succeeded. This is to be consistent with other
2641 : * objects.
2642 : */
2643 44 : if (datForm->datdba != newOwnerId)
2644 : {
2645 : Datum repl_val[Natts_pg_database];
2646 24 : bool repl_null[Natts_pg_database] = {0};
2647 24 : bool repl_repl[Natts_pg_database] = {0};
2648 : Acl *newAcl;
2649 : Datum aclDatum;
2650 : bool isNull;
2651 : HeapTuple newtuple;
2652 :
2653 : /* Otherwise, must be owner of the existing object */
2654 24 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2655 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2656 : dbname);
2657 :
2658 : /* Must be able to become new owner */
2659 24 : check_can_set_role(GetUserId(), newOwnerId);
2660 :
2661 : /*
2662 : * must have createdb rights
2663 : *
2664 : * NOTE: This is different from other alter-owner checks in that the
2665 : * current user is checked for createdb privileges instead of the
2666 : * destination owner. This is consistent with the CREATE case for
2667 : * databases. Because superusers will always have this right, we need
2668 : * no special case for them.
2669 : */
2670 24 : if (!have_createdb_privilege())
2671 0 : ereport(ERROR,
2672 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2673 : errmsg("permission denied to change owner of database")));
2674 :
2675 24 : repl_repl[Anum_pg_database_datdba - 1] = true;
2676 24 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2677 :
2678 : /*
2679 : * Determine the modified ACL for the new owner. This is only
2680 : * necessary when the ACL is non-null.
2681 : */
2682 24 : aclDatum = heap_getattr(tuple,
2683 : Anum_pg_database_datacl,
2684 : RelationGetDescr(rel),
2685 : &isNull);
2686 24 : if (!isNull)
2687 : {
2688 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2689 : datForm->datdba, newOwnerId);
2690 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2691 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2692 : }
2693 :
2694 24 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2695 24 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2696 :
2697 24 : heap_freetuple(newtuple);
2698 :
2699 : /* Update owner dependency reference */
2700 24 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2701 : }
2702 :
2703 44 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2704 :
2705 44 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2706 :
2707 44 : systable_endscan(scan);
2708 :
2709 : /* Close pg_database, but keep lock till commit */
2710 44 : table_close(rel, NoLock);
2711 :
2712 44 : return address;
2713 : }
2714 :
2715 :
2716 : Datum
2717 76 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2718 : {
2719 76 : Oid dbid = PG_GETARG_OID(0);
2720 : HeapTuple tp;
2721 : char datlocprovider;
2722 : Datum datum;
2723 : char *version;
2724 :
2725 76 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2726 76 : if (!HeapTupleIsValid(tp))
2727 0 : ereport(ERROR,
2728 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2729 : errmsg("database with OID %u does not exist", dbid)));
2730 :
2731 76 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2732 :
2733 76 : if (datlocprovider == COLLPROVIDER_LIBC)
2734 62 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2735 : else
2736 14 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2737 :
2738 76 : version = get_collation_actual_version(datlocprovider,
2739 76 : TextDatumGetCString(datum));
2740 :
2741 76 : ReleaseSysCache(tp);
2742 :
2743 76 : if (version)
2744 48 : PG_RETURN_TEXT_P(cstring_to_text(version));
2745 : else
2746 28 : PG_RETURN_NULL();
2747 : }
2748 :
2749 :
2750 : /*
2751 : * Helper functions
2752 : */
2753 :
2754 : /*
2755 : * Look up info about the database named "name". If the database exists,
2756 : * obtain the specified lock type on it, fill in any of the remaining
2757 : * parameters that aren't NULL, and return true. If no such database,
2758 : * return false.
2759 : */
2760 : static bool
2761 736 : get_db_info(const char *name, LOCKMODE lockmode,
2762 : Oid *dbIdP, Oid *ownerIdP,
2763 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2764 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2765 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2766 : char **dbIcurules,
2767 : char *dbLocProvider,
2768 : char **dbCollversion)
2769 : {
2770 736 : bool result = false;
2771 : Relation relation;
2772 :
2773 : Assert(name);
2774 :
2775 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2776 736 : relation = table_open(DatabaseRelationId, AccessShareLock);
2777 :
2778 : /*
2779 : * Loop covers the rare case where the database is renamed before we can
2780 : * lock it. We try again just in case we can find a new one of the same
2781 : * name.
2782 : */
2783 : for (;;)
2784 0 : {
2785 : ScanKeyData scanKey;
2786 : SysScanDesc scan;
2787 : HeapTuple tuple;
2788 : Oid dbOid;
2789 :
2790 : /*
2791 : * there's no syscache for database-indexed-by-name, so must do it the
2792 : * hard way
2793 : */
2794 736 : ScanKeyInit(&scanKey,
2795 : Anum_pg_database_datname,
2796 : BTEqualStrategyNumber, F_NAMEEQ,
2797 : CStringGetDatum(name));
2798 :
2799 736 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2800 : NULL, 1, &scanKey);
2801 :
2802 736 : tuple = systable_getnext(scan);
2803 :
2804 736 : if (!HeapTupleIsValid(tuple))
2805 : {
2806 : /* definitely no database of that name */
2807 32 : systable_endscan(scan);
2808 32 : break;
2809 : }
2810 :
2811 704 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2812 :
2813 704 : systable_endscan(scan);
2814 :
2815 : /*
2816 : * Now that we have a database OID, we can try to lock the DB.
2817 : */
2818 704 : if (lockmode != NoLock)
2819 704 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2820 :
2821 : /*
2822 : * And now, re-fetch the tuple by OID. If it's still there and still
2823 : * the same name, we win; else, drop the lock and loop back to try
2824 : * again.
2825 : */
2826 704 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2827 704 : if (HeapTupleIsValid(tuple))
2828 : {
2829 704 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2830 :
2831 704 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2832 : {
2833 : Datum datum;
2834 : bool isnull;
2835 :
2836 : /* oid of the database */
2837 704 : if (dbIdP)
2838 704 : *dbIdP = dbOid;
2839 : /* oid of the owner */
2840 704 : if (ownerIdP)
2841 632 : *ownerIdP = dbform->datdba;
2842 : /* character encoding */
2843 704 : if (encodingP)
2844 632 : *encodingP = dbform->encoding;
2845 : /* allowed as template? */
2846 704 : if (dbIsTemplateP)
2847 694 : *dbIsTemplateP = dbform->datistemplate;
2848 : /* Has on login event trigger? */
2849 704 : if (dbHasLoginEvtP)
2850 632 : *dbHasLoginEvtP = dbform->dathasloginevt;
2851 : /* allowing connections? */
2852 704 : if (dbAllowConnP)
2853 632 : *dbAllowConnP = dbform->datallowconn;
2854 : /* limit of frozen XIDs */
2855 704 : if (dbFrozenXidP)
2856 632 : *dbFrozenXidP = dbform->datfrozenxid;
2857 : /* minimum MultiXactId */
2858 704 : if (dbMinMultiP)
2859 632 : *dbMinMultiP = dbform->datminmxid;
2860 : /* default tablespace for this database */
2861 704 : if (dbTablespace)
2862 642 : *dbTablespace = dbform->dattablespace;
2863 : /* default locale settings for this database */
2864 704 : if (dbLocProvider)
2865 632 : *dbLocProvider = dbform->datlocprovider;
2866 704 : if (dbCollate)
2867 : {
2868 632 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2869 632 : *dbCollate = TextDatumGetCString(datum);
2870 : }
2871 704 : if (dbCtype)
2872 : {
2873 632 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2874 632 : *dbCtype = TextDatumGetCString(datum);
2875 : }
2876 704 : if (dbLocale)
2877 : {
2878 632 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2879 632 : if (isnull)
2880 576 : *dbLocale = NULL;
2881 : else
2882 56 : *dbLocale = TextDatumGetCString(datum);
2883 : }
2884 704 : if (dbIcurules)
2885 : {
2886 632 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2887 632 : if (isnull)
2888 632 : *dbIcurules = NULL;
2889 : else
2890 0 : *dbIcurules = TextDatumGetCString(datum);
2891 : }
2892 704 : if (dbCollversion)
2893 : {
2894 632 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2895 632 : if (isnull)
2896 390 : *dbCollversion = NULL;
2897 : else
2898 242 : *dbCollversion = TextDatumGetCString(datum);
2899 : }
2900 704 : ReleaseSysCache(tuple);
2901 704 : result = true;
2902 704 : break;
2903 : }
2904 : /* can only get here if it was just renamed */
2905 0 : ReleaseSysCache(tuple);
2906 : }
2907 :
2908 0 : if (lockmode != NoLock)
2909 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2910 : }
2911 :
2912 736 : table_close(relation, AccessShareLock);
2913 :
2914 736 : return result;
2915 : }
2916 :
2917 : /* Check if current user has createdb privileges */
2918 : bool
2919 692 : have_createdb_privilege(void)
2920 : {
2921 692 : bool result = false;
2922 : HeapTuple utup;
2923 :
2924 : /* Superusers can always do everything */
2925 692 : if (superuser())
2926 656 : return true;
2927 :
2928 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2929 36 : if (HeapTupleIsValid(utup))
2930 : {
2931 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2932 36 : ReleaseSysCache(utup);
2933 : }
2934 36 : return result;
2935 : }
2936 :
2937 : /*
2938 : * Remove tablespace directories
2939 : *
2940 : * We don't know what tablespaces db_id is using, so iterate through all
2941 : * tablespaces removing <tablespace>/db_id
2942 : */
2943 : static void
2944 60 : remove_dbtablespaces(Oid db_id)
2945 : {
2946 : Relation rel;
2947 : TableScanDesc scan;
2948 : HeapTuple tuple;
2949 60 : List *ltblspc = NIL;
2950 : ListCell *cell;
2951 : int ntblspc;
2952 : int i;
2953 : Oid *tablespace_ids;
2954 :
2955 60 : rel = table_open(TableSpaceRelationId, AccessShareLock);
2956 60 : scan = table_beginscan_catalog(rel, 0, NULL);
2957 224 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2958 : {
2959 164 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2960 164 : Oid dsttablespace = spcform->oid;
2961 : char *dstpath;
2962 : struct stat st;
2963 :
2964 : /* Don't mess with the global tablespace */
2965 164 : if (dsttablespace == GLOBALTABLESPACE_OID)
2966 104 : continue;
2967 :
2968 104 : dstpath = GetDatabasePath(db_id, dsttablespace);
2969 :
2970 104 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
2971 : {
2972 : /* Assume we can ignore it */
2973 44 : pfree(dstpath);
2974 44 : continue;
2975 : }
2976 :
2977 60 : if (!rmtree(dstpath, true))
2978 0 : ereport(WARNING,
2979 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2980 : dstpath)));
2981 :
2982 60 : ltblspc = lappend_oid(ltblspc, dsttablespace);
2983 60 : pfree(dstpath);
2984 : }
2985 :
2986 60 : ntblspc = list_length(ltblspc);
2987 60 : if (ntblspc == 0)
2988 : {
2989 0 : table_endscan(scan);
2990 0 : table_close(rel, AccessShareLock);
2991 0 : return;
2992 : }
2993 :
2994 60 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
2995 60 : i = 0;
2996 120 : foreach(cell, ltblspc)
2997 60 : tablespace_ids[i++] = lfirst_oid(cell);
2998 :
2999 : /* Record the filesystem change in XLOG */
3000 : {
3001 : xl_dbase_drop_rec xlrec;
3002 :
3003 60 : xlrec.db_id = db_id;
3004 60 : xlrec.ntablespaces = ntblspc;
3005 :
3006 60 : XLogBeginInsert();
3007 60 : XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
3008 60 : XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
3009 :
3010 60 : (void) XLogInsert(RM_DBASE_ID,
3011 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3012 : }
3013 :
3014 60 : list_free(ltblspc);
3015 60 : pfree(tablespace_ids);
3016 :
3017 60 : table_endscan(scan);
3018 60 : table_close(rel, AccessShareLock);
3019 : }
3020 :
3021 : /*
3022 : * Check for existing files that conflict with a proposed new DB OID;
3023 : * return true if there are any
3024 : *
3025 : * If there were a subdirectory in any tablespace matching the proposed new
3026 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3027 : * try to remove that already-existing subdirectory during the cleanup in
3028 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3029 : * instead we make this extra check before settling on the OID of the new
3030 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3031 : * relfilenumber values.
3032 : */
3033 : static bool
3034 606 : check_db_file_conflict(Oid db_id)
3035 : {
3036 606 : bool result = false;
3037 : Relation rel;
3038 : TableScanDesc scan;
3039 : HeapTuple tuple;
3040 :
3041 606 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3042 606 : scan = table_beginscan_catalog(rel, 0, NULL);
3043 1910 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3044 : {
3045 1304 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3046 1304 : Oid dsttablespace = spcform->oid;
3047 : char *dstpath;
3048 : struct stat st;
3049 :
3050 : /* Don't mess with the global tablespace */
3051 1304 : if (dsttablespace == GLOBALTABLESPACE_OID)
3052 606 : continue;
3053 :
3054 698 : dstpath = GetDatabasePath(db_id, dsttablespace);
3055 :
3056 698 : if (lstat(dstpath, &st) == 0)
3057 : {
3058 : /* Found a conflicting file (or directory, whatever) */
3059 0 : pfree(dstpath);
3060 0 : result = true;
3061 0 : break;
3062 : }
3063 :
3064 698 : pfree(dstpath);
3065 : }
3066 :
3067 606 : table_endscan(scan);
3068 606 : table_close(rel, AccessShareLock);
3069 :
3070 606 : return result;
3071 : }
3072 :
3073 : /*
3074 : * Issue a suitable errdetail message for a busy database
3075 : */
3076 : static int
3077 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3078 : {
3079 0 : if (notherbackends > 0 && npreparedxacts > 0)
3080 :
3081 : /*
3082 : * We don't deal with singular versus plural here, since gettext
3083 : * doesn't support multiple plurals in one string.
3084 : */
3085 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3086 : notherbackends, npreparedxacts);
3087 0 : else if (notherbackends > 0)
3088 0 : errdetail_plural("There is %d other session using the database.",
3089 : "There are %d other sessions using the database.",
3090 : notherbackends,
3091 : notherbackends);
3092 : else
3093 0 : errdetail_plural("There is %d prepared transaction using the database.",
3094 : "There are %d prepared transactions using the database.",
3095 : npreparedxacts,
3096 : npreparedxacts);
3097 0 : return 0; /* just to keep ereport macro happy */
3098 : }
3099 :
3100 : /*
3101 : * get_database_oid - given a database name, look up the OID
3102 : *
3103 : * If missing_ok is false, throw an error if database name not found. If
3104 : * true, just return InvalidOid.
3105 : */
3106 : Oid
3107 2356 : get_database_oid(const char *dbname, bool missing_ok)
3108 : {
3109 : Relation pg_database;
3110 : ScanKeyData entry[1];
3111 : SysScanDesc scan;
3112 : HeapTuple dbtuple;
3113 : Oid oid;
3114 :
3115 : /*
3116 : * There's no syscache for pg_database indexed by name, so we must look
3117 : * the hard way.
3118 : */
3119 2356 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3120 2356 : ScanKeyInit(&entry[0],
3121 : Anum_pg_database_datname,
3122 : BTEqualStrategyNumber, F_NAMEEQ,
3123 : CStringGetDatum(dbname));
3124 2356 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3125 : NULL, 1, entry);
3126 :
3127 2356 : dbtuple = systable_getnext(scan);
3128 :
3129 : /* We assume that there can be at most one matching tuple */
3130 2356 : if (HeapTupleIsValid(dbtuple))
3131 1744 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3132 : else
3133 612 : oid = InvalidOid;
3134 :
3135 2356 : systable_endscan(scan);
3136 2356 : table_close(pg_database, AccessShareLock);
3137 :
3138 2356 : if (!OidIsValid(oid) && !missing_ok)
3139 6 : ereport(ERROR,
3140 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3141 : errmsg("database \"%s\" does not exist",
3142 : dbname)));
3143 :
3144 2350 : return oid;
3145 : }
3146 :
3147 :
3148 : /*
3149 : * get_database_name - given a database OID, look up the name
3150 : *
3151 : * Returns a palloc'd string, or NULL if no such database.
3152 : */
3153 : char *
3154 26798 : get_database_name(Oid dbid)
3155 : {
3156 : HeapTuple dbtuple;
3157 : char *result;
3158 :
3159 26798 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3160 26798 : if (HeapTupleIsValid(dbtuple))
3161 : {
3162 26598 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3163 26598 : ReleaseSysCache(dbtuple);
3164 : }
3165 : else
3166 200 : result = NULL;
3167 :
3168 26798 : return result;
3169 : }
3170 :
3171 :
3172 : /*
3173 : * While dropping a database the pg_database row is marked invalid, but the
3174 : * catalog contents still exist. Connections to such a database are not
3175 : * allowed.
3176 : */
3177 : bool
3178 26984 : database_is_invalid_form(Form_pg_database datform)
3179 : {
3180 26984 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3181 : }
3182 :
3183 :
3184 : /*
3185 : * Convenience wrapper around database_is_invalid_form()
3186 : */
3187 : bool
3188 632 : database_is_invalid_oid(Oid dboid)
3189 : {
3190 : HeapTuple dbtup;
3191 : Form_pg_database dbform;
3192 : bool invalid;
3193 :
3194 632 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3195 632 : if (!HeapTupleIsValid(dbtup))
3196 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3197 632 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3198 :
3199 632 : invalid = database_is_invalid_form(dbform);
3200 :
3201 632 : ReleaseSysCache(dbtup);
3202 :
3203 632 : return invalid;
3204 : }
3205 :
3206 :
3207 : /*
3208 : * recovery_create_dbdir()
3209 : *
3210 : * During recovery, there's a case where we validly need to recover a missing
3211 : * tablespace directory so that recovery can continue. This happens when
3212 : * recovery wants to create a database but the holding tablespace has been
3213 : * removed before the server stopped. Since we expect that the directory will
3214 : * be gone before reaching recovery consistency, and we have no knowledge about
3215 : * the tablespace other than its OID here, we create a real directory under
3216 : * pg_tblspc here instead of restoring the symlink.
3217 : *
3218 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3219 : */
3220 : static void
3221 40 : recovery_create_dbdir(char *path, bool only_tblspc)
3222 : {
3223 : struct stat st;
3224 :
3225 : Assert(RecoveryInProgress());
3226 :
3227 40 : if (stat(path, &st) == 0)
3228 40 : return;
3229 :
3230 0 : if (only_tblspc && strstr(path, "pg_tblspc/") == NULL)
3231 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3232 :
3233 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3234 0 : ereport(PANIC,
3235 : errmsg("missing directory \"%s\"", path));
3236 :
3237 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3238 : "creating missing directory: %s", path);
3239 :
3240 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3241 0 : ereport(PANIC,
3242 : errmsg("could not create missing directory \"%s\": %m", path));
3243 : }
3244 :
3245 :
3246 : /*
3247 : * DATABASE resource manager's routines
3248 : */
3249 : void
3250 68 : dbase_redo(XLogReaderState *record)
3251 : {
3252 68 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3253 :
3254 : /* Backup blocks are not used in dbase records */
3255 : Assert(!XLogRecHasAnyBlockRefs(record));
3256 :
3257 68 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3258 : {
3259 6 : xl_dbase_create_file_copy_rec *xlrec =
3260 6 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3261 : char *src_path;
3262 : char *dst_path;
3263 : char *parent_path;
3264 : struct stat st;
3265 :
3266 6 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3267 6 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3268 :
3269 : /*
3270 : * Our theory for replaying a CREATE is to forcibly drop the target
3271 : * subdirectory if present, then re-copy the source data. This may be
3272 : * more work than needed, but it is simple to implement.
3273 : */
3274 6 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3275 : {
3276 0 : if (!rmtree(dst_path, true))
3277 : /* If this failed, copydir() below is going to error. */
3278 0 : ereport(WARNING,
3279 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3280 : dst_path)));
3281 : }
3282 :
3283 : /*
3284 : * If the parent of the target path doesn't exist, create it now. This
3285 : * enables us to create the target underneath later.
3286 : */
3287 6 : parent_path = pstrdup(dst_path);
3288 6 : get_parent_directory(parent_path);
3289 6 : if (stat(parent_path, &st) < 0)
3290 : {
3291 0 : if (errno != ENOENT)
3292 0 : ereport(FATAL,
3293 : errmsg("could not stat directory \"%s\": %m",
3294 : dst_path));
3295 :
3296 : /* create the parent directory if needed and valid */
3297 0 : recovery_create_dbdir(parent_path, true);
3298 : }
3299 6 : pfree(parent_path);
3300 :
3301 : /*
3302 : * There's a case where the copy source directory is missing for the
3303 : * same reason above. Create the empty source directory so that
3304 : * copydir below doesn't fail. The directory will be dropped soon by
3305 : * recovery.
3306 : */
3307 6 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3308 0 : recovery_create_dbdir(src_path, false);
3309 :
3310 : /*
3311 : * Force dirty buffers out to disk, to ensure source database is
3312 : * up-to-date for the copy.
3313 : */
3314 6 : FlushDatabaseBuffers(xlrec->src_db_id);
3315 :
3316 : /* Close all smgr fds in all backends. */
3317 6 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3318 :
3319 : /*
3320 : * Copy this subdirectory to the new location
3321 : *
3322 : * We don't need to copy subdirectories
3323 : */
3324 6 : copydir(src_path, dst_path, false);
3325 :
3326 6 : pfree(src_path);
3327 6 : pfree(dst_path);
3328 : }
3329 62 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3330 : {
3331 40 : xl_dbase_create_wal_log_rec *xlrec =
3332 40 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3333 : char *dbpath;
3334 : char *parent_path;
3335 :
3336 40 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3337 :
3338 : /* create the parent directory if needed and valid */
3339 40 : parent_path = pstrdup(dbpath);
3340 40 : get_parent_directory(parent_path);
3341 40 : recovery_create_dbdir(parent_path, true);
3342 :
3343 : /* Create the database directory with the version file. */
3344 40 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3345 : true);
3346 40 : pfree(dbpath);
3347 : }
3348 22 : else if (info == XLOG_DBASE_DROP)
3349 : {
3350 22 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3351 : char *dst_path;
3352 : int i;
3353 :
3354 22 : if (InHotStandby)
3355 : {
3356 : /*
3357 : * Lock database while we resolve conflicts to ensure that
3358 : * InitPostgres() cannot fully re-execute concurrently. This
3359 : * avoids backends re-connecting automatically to same database,
3360 : * which can happen in some cases.
3361 : *
3362 : * This will lock out walsenders trying to connect to db-specific
3363 : * slots for logical decoding too, so it's safe for us to drop
3364 : * slots.
3365 : */
3366 22 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3367 22 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3368 : }
3369 :
3370 : /* Drop any database-specific replication slots */
3371 22 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3372 :
3373 : /* Drop pages for this database that are in the shared buffer cache */
3374 22 : DropDatabaseBuffers(xlrec->db_id);
3375 :
3376 : /* Also, clean out any fsync requests that might be pending in md.c */
3377 22 : ForgetDatabaseSyncRequests(xlrec->db_id);
3378 :
3379 : /* Clean out the xlog relcache too */
3380 22 : XLogDropDatabase(xlrec->db_id);
3381 :
3382 : /* Close all smgr fds in all backends. */
3383 22 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3384 :
3385 44 : for (i = 0; i < xlrec->ntablespaces; i++)
3386 : {
3387 22 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3388 :
3389 : /* And remove the physical files */
3390 22 : if (!rmtree(dst_path, true))
3391 0 : ereport(WARNING,
3392 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3393 : dst_path)));
3394 22 : pfree(dst_path);
3395 : }
3396 :
3397 22 : if (InHotStandby)
3398 : {
3399 : /*
3400 : * Release locks prior to commit. XXX There is a race condition
3401 : * here that may allow backends to reconnect, but the window for
3402 : * this is small because the gap between here and commit is mostly
3403 : * fairly small and it is unlikely that people will be dropping
3404 : * databases that we are trying to connect to anyway.
3405 : */
3406 22 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3407 : }
3408 : }
3409 : else
3410 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3411 68 : }
|