Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/guc.h"
68 : #include "utils/pg_locale.h"
69 : #include "utils/relmapper.h"
70 : #include "utils/snapmgr.h"
71 : #include "utils/syscache.h"
72 :
73 : /*
74 : * Create database strategy.
75 : *
76 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
77 : * copied block.
78 : *
79 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
80 : * database and log a single record for each tablespace copied. To make this
81 : * safe, it also triggers checkpoints before and after the operation.
82 : */
83 : typedef enum CreateDBStrategy
84 : {
85 : CREATEDB_WAL_LOG,
86 : CREATEDB_FILE_COPY,
87 : } CreateDBStrategy;
88 :
89 : typedef struct
90 : {
91 : Oid src_dboid; /* source (template) DB */
92 : Oid dest_dboid; /* DB we are trying to create */
93 : CreateDBStrategy strategy; /* create db strategy */
94 : } createdb_failure_params;
95 :
96 : typedef struct
97 : {
98 : Oid dest_dboid; /* DB we are trying to move */
99 : Oid dest_tsoid; /* tablespace we are trying to move to */
100 : } movedb_failure_params;
101 :
102 : /*
103 : * Information about a relation to be copied when creating a database.
104 : */
105 : typedef struct CreateDBRelInfo
106 : {
107 : RelFileLocator rlocator; /* physical relation identifier */
108 : Oid reloid; /* relation oid */
109 : bool permanent; /* relation is permanent or unlogged */
110 : } CreateDBRelInfo;
111 :
112 :
113 : /* non-export function prototypes */
114 : static void createdb_failure_callback(int code, Datum arg);
115 : static void movedb(const char *dbname, const char *tblspcname);
116 : static void movedb_failure_callback(int code, Datum arg);
117 : static bool get_db_info(const char *name, LOCKMODE lockmode,
118 : Oid *dbIdP, Oid *ownerIdP,
119 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
120 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
121 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
122 : char **dbIcurules,
123 : char *dbLocProvider,
124 : char **dbCollversion);
125 : static void remove_dbtablespaces(Oid db_id);
126 : static bool check_db_file_conflict(Oid db_id);
127 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
128 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
129 : Oid dst_tsid);
130 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
131 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
132 : Oid dbid, char *srcpath,
133 : List *rlocatorlist, Snapshot snapshot);
134 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
135 : Oid tbid, Oid dbid,
136 : char *srcpath);
137 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
138 : bool isRedo);
139 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
140 : Oid src_tsid, Oid dst_tsid);
141 : static void recovery_create_dbdir(char *path, bool only_tblspc);
142 :
143 : /*
144 : * Create a new database using the WAL_LOG strategy.
145 : *
146 : * Each copied block is separately written to the write-ahead log.
147 : */
148 : static void
149 404 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
150 : Oid src_tsid, Oid dst_tsid)
151 : {
152 : char *srcpath;
153 : char *dstpath;
154 404 : List *rlocatorlist = NULL;
155 : ListCell *cell;
156 : LockRelId srcrelid;
157 : LockRelId dstrelid;
158 : RelFileLocator srcrlocator;
159 : RelFileLocator dstrlocator;
160 : CreateDBRelInfo *relinfo;
161 :
162 : /* Get source and destination database paths. */
163 404 : srcpath = GetDatabasePath(src_dboid, src_tsid);
164 404 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
165 :
166 : /* Create database directory and write PG_VERSION file. */
167 404 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
168 :
169 : /* Copy relmap file from source database to the destination database. */
170 404 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
171 :
172 : /* Get list of relfilelocators to copy from the source database. */
173 404 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
174 : Assert(rlocatorlist != NIL);
175 :
176 : /*
177 : * Database IDs will be the same for all relations so set them before
178 : * entering the loop.
179 : */
180 404 : srcrelid.dbId = src_dboid;
181 404 : dstrelid.dbId = dst_dboid;
182 :
183 : /* Loop over our list of relfilelocators and copy each one. */
184 90118 : foreach(cell, rlocatorlist)
185 : {
186 89714 : relinfo = lfirst(cell);
187 89714 : srcrlocator = relinfo->rlocator;
188 :
189 : /*
190 : * If the relation is from the source db's default tablespace then we
191 : * need to create it in the destination db's default tablespace.
192 : * Otherwise, we need to create in the same tablespace as it is in the
193 : * source database.
194 : */
195 89714 : if (srcrlocator.spcOid == src_tsid)
196 89714 : dstrlocator.spcOid = dst_tsid;
197 : else
198 0 : dstrlocator.spcOid = srcrlocator.spcOid;
199 :
200 89714 : dstrlocator.dbOid = dst_dboid;
201 89714 : dstrlocator.relNumber = srcrlocator.relNumber;
202 :
203 : /*
204 : * Acquire locks on source and target relations before copying.
205 : *
206 : * We typically do not read relation data into shared_buffers without
207 : * holding a relation lock. It's unclear what could go wrong if we
208 : * skipped it in this case, because nobody can be modifying either the
209 : * source or destination database at this point, and we have locks on
210 : * both databases, too, but let's take the conservative route.
211 : */
212 89714 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
213 89714 : LockRelationId(&srcrelid, AccessShareLock);
214 89714 : LockRelationId(&dstrelid, AccessShareLock);
215 :
216 : /* Copy relation storage from source to the destination. */
217 89714 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
218 :
219 : /* Release the relation locks. */
220 89714 : UnlockRelationId(&srcrelid, AccessShareLock);
221 89714 : UnlockRelationId(&dstrelid, AccessShareLock);
222 : }
223 :
224 404 : pfree(srcpath);
225 404 : pfree(dstpath);
226 404 : list_free_deep(rlocatorlist);
227 404 : }
228 :
229 : /*
230 : * Scan the pg_class table in the source database to identify the relations
231 : * that need to be copied to the destination database.
232 : *
233 : * This is an exception to the usual rule that cross-database access is
234 : * not possible. We can make it work here because we know that there are no
235 : * connections to the source database and (since there can't be prepared
236 : * transactions touching that database) no in-doubt tuples either. This
237 : * means that we don't need to worry about pruning removing anything from
238 : * under us, and we don't need to be too picky about our snapshot either.
239 : * As long as it sees all previously-committed XIDs as committed and all
240 : * aborted XIDs as aborted, we should be fine: nothing else is possible
241 : * here.
242 : *
243 : * We can't rely on the relcache for anything here, because that only knows
244 : * about the database to which we are connected, and can't handle access to
245 : * other databases. That also means we can't rely on the heap scan
246 : * infrastructure, which would be a bad idea anyway since it might try
247 : * to do things like HOT pruning which we definitely can't do safely in
248 : * a database to which we're not even connected.
249 : */
250 : static List *
251 404 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
252 : {
253 : RelFileLocator rlocator;
254 : BlockNumber nblocks;
255 : BlockNumber blkno;
256 : Buffer buf;
257 : RelFileNumber relfilenumber;
258 : Page page;
259 404 : List *rlocatorlist = NIL;
260 : LockRelId relid;
261 : Snapshot snapshot;
262 : SMgrRelation smgr;
263 : BufferAccessStrategy bstrategy;
264 :
265 : /* Get pg_class relfilenumber. */
266 404 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
267 : RelationRelationId);
268 :
269 : /* Don't read data into shared_buffers without holding a relation lock. */
270 404 : relid.dbId = dbid;
271 404 : relid.relId = RelationRelationId;
272 404 : LockRelationId(&relid, AccessShareLock);
273 :
274 : /* Prepare a RelFileLocator for the pg_class relation. */
275 404 : rlocator.spcOid = tbid;
276 404 : rlocator.dbOid = dbid;
277 404 : rlocator.relNumber = relfilenumber;
278 :
279 404 : smgr = smgropen(rlocator, InvalidBackendId);
280 404 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
281 404 : smgrclose(smgr);
282 :
283 : /* Use a buffer access strategy since this is a bulk read operation. */
284 404 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
285 :
286 : /*
287 : * As explained in the function header comments, we need a snapshot that
288 : * will see all committed transactions as committed, and our transaction
289 : * snapshot - or the active snapshot - might not be new enough for that,
290 : * but the return value of GetLatestSnapshot() should work fine.
291 : */
292 404 : snapshot = GetLatestSnapshot();
293 :
294 : /* Process the relation block by block. */
295 6092 : for (blkno = 0; blkno < nblocks; blkno++)
296 : {
297 5688 : CHECK_FOR_INTERRUPTS();
298 :
299 5688 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
300 : RBM_NORMAL, bstrategy, true);
301 :
302 5688 : LockBuffer(buf, BUFFER_LOCK_SHARE);
303 5688 : page = BufferGetPage(buf);
304 5688 : if (PageIsNew(page) || PageIsEmpty(page))
305 : {
306 0 : UnlockReleaseBuffer(buf);
307 0 : continue;
308 : }
309 :
310 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
311 5688 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
312 : srcpath, rlocatorlist,
313 : snapshot);
314 :
315 5688 : UnlockReleaseBuffer(buf);
316 : }
317 :
318 : /* Release relation lock. */
319 404 : UnlockRelationId(&relid, AccessShareLock);
320 :
321 404 : return rlocatorlist;
322 : }
323 :
324 : /*
325 : * Scan one page of the source database's pg_class relation and add relevant
326 : * entries to rlocatorlist. The return value is the updated list.
327 : */
328 : static List *
329 5688 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
330 : char *srcpath, List *rlocatorlist,
331 : Snapshot snapshot)
332 : {
333 5688 : BlockNumber blkno = BufferGetBlockNumber(buf);
334 : OffsetNumber offnum;
335 : OffsetNumber maxoff;
336 : HeapTupleData tuple;
337 :
338 5688 : maxoff = PageGetMaxOffsetNumber(page);
339 :
340 : /* Loop over offsets. */
341 290178 : for (offnum = FirstOffsetNumber;
342 : offnum <= maxoff;
343 284490 : offnum = OffsetNumberNext(offnum))
344 : {
345 : ItemId itemid;
346 :
347 284490 : itemid = PageGetItemId(page, offnum);
348 :
349 : /* Nothing to do if slot is empty or already dead. */
350 284490 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
351 204506 : ItemIdIsRedirected(itemid))
352 115064 : continue;
353 :
354 : Assert(ItemIdIsNormal(itemid));
355 169426 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
356 :
357 : /* Initialize a HeapTupleData structure. */
358 169426 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
359 169426 : tuple.t_len = ItemIdGetLength(itemid);
360 169426 : tuple.t_tableOid = RelationRelationId;
361 :
362 : /* Skip tuples that are not visible to this snapshot. */
363 169426 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
364 : {
365 : CreateDBRelInfo *relinfo;
366 :
367 : /*
368 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
369 : * CreateDBRelInfo object for this tuple, but can also decide that
370 : * this tuple isn't something we need to copy. If we do need to
371 : * copy the relation, add it to the list.
372 : */
373 167686 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
374 : srcpath);
375 167686 : if (relinfo != NULL)
376 89714 : rlocatorlist = lappend(rlocatorlist, relinfo);
377 : }
378 : }
379 :
380 5688 : return rlocatorlist;
381 : }
382 :
383 : /*
384 : * Decide whether a certain pg_class tuple represents something that
385 : * needs to be copied from the source database to the destination database,
386 : * and if so, construct a CreateDBRelInfo for it.
387 : *
388 : * Visibility checks are handled by the caller, so our job here is just
389 : * to assess the data stored in the tuple.
390 : */
391 : CreateDBRelInfo *
392 167686 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
393 : char *srcpath)
394 : {
395 : CreateDBRelInfo *relinfo;
396 : Form_pg_class classForm;
397 167686 : RelFileNumber relfilenumber = InvalidRelFileNumber;
398 :
399 167686 : classForm = (Form_pg_class) GETSTRUCT(tuple);
400 :
401 : /*
402 : * Return NULL if this object does not need to be copied.
403 : *
404 : * Shared objects don't need to be copied, because they are shared.
405 : * Objects without storage can't be copied, because there's nothing to
406 : * copy. Temporary relations don't need to be copied either, because they
407 : * are inaccessible outside of the session that created them, which must
408 : * be gone already, and couldn't connect to a different database if it
409 : * still existed. autovacuum will eventually remove the pg_class entries
410 : * as well.
411 : */
412 167686 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
413 147486 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
414 89714 : classForm->relpersistence == RELPERSISTENCE_TEMP)
415 77972 : return NULL;
416 :
417 : /*
418 : * If relfilenumber is valid then directly use it. Otherwise, consult the
419 : * relmap.
420 : */
421 89714 : if (RelFileNumberIsValid(classForm->relfilenode))
422 82846 : relfilenumber = classForm->relfilenode;
423 : else
424 6868 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
425 : classForm->oid);
426 :
427 : /* We must have a valid relfilenumber. */
428 89714 : if (!RelFileNumberIsValid(relfilenumber))
429 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
430 : classForm->oid);
431 :
432 : /* Prepare a rel info element and add it to the list. */
433 89714 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
434 89714 : if (OidIsValid(classForm->reltablespace))
435 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
436 : else
437 89714 : relinfo->rlocator.spcOid = tbid;
438 :
439 89714 : relinfo->rlocator.dbOid = dbid;
440 89714 : relinfo->rlocator.relNumber = relfilenumber;
441 89714 : relinfo->reloid = classForm->oid;
442 :
443 : /* Temporary relations were rejected above. */
444 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
445 89714 : relinfo->permanent =
446 89714 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
447 :
448 89714 : return relinfo;
449 : }
450 :
451 : /*
452 : * Create database directory and write out the PG_VERSION file in the database
453 : * path. If isRedo is true, it's okay for the database directory to exist
454 : * already.
455 : */
456 : static void
457 438 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
458 : {
459 : int fd;
460 : int nbytes;
461 : char versionfile[MAXPGPATH];
462 : char buf[16];
463 :
464 : /*
465 : * Prepare version data before starting a critical section.
466 : *
467 : * Note that we don't have to copy this from the source database; there's
468 : * only one legal value.
469 : */
470 438 : sprintf(buf, "%s\n", PG_MAJORVERSION);
471 438 : nbytes = strlen(PG_MAJORVERSION) + 1;
472 :
473 : /* If we are not in WAL replay then write the WAL. */
474 438 : if (!isRedo)
475 : {
476 : xl_dbase_create_wal_log_rec xlrec;
477 : XLogRecPtr lsn;
478 :
479 404 : START_CRIT_SECTION();
480 :
481 404 : xlrec.db_id = dbid;
482 404 : xlrec.tablespace_id = tsid;
483 :
484 404 : XLogBeginInsert();
485 404 : XLogRegisterData((char *) (&xlrec),
486 : sizeof(xl_dbase_create_wal_log_rec));
487 :
488 404 : lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
489 :
490 : /* As always, WAL must hit the disk before the data update does. */
491 404 : XLogFlush(lsn);
492 : }
493 :
494 : /* Create database directory. */
495 438 : if (MakePGDirectory(dbpath) < 0)
496 : {
497 : /* Failure other than already exists or not in WAL replay? */
498 14 : if (errno != EEXIST || !isRedo)
499 0 : ereport(ERROR,
500 : (errcode_for_file_access(),
501 : errmsg("could not create directory \"%s\": %m", dbpath)));
502 : }
503 :
504 : /*
505 : * Create PG_VERSION file in the database path. If the file already
506 : * exists and we are in WAL replay then try again to open it in write
507 : * mode.
508 : */
509 438 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
510 :
511 438 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
512 438 : if (fd < 0 && errno == EEXIST && isRedo)
513 14 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
514 :
515 438 : if (fd < 0)
516 0 : ereport(ERROR,
517 : (errcode_for_file_access(),
518 : errmsg("could not create file \"%s\": %m", versionfile)));
519 :
520 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
521 438 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
522 438 : errno = 0;
523 438 : if ((int) write(fd, buf, nbytes) != nbytes)
524 : {
525 : /* If write didn't set errno, assume problem is no disk space. */
526 0 : if (errno == 0)
527 0 : errno = ENOSPC;
528 0 : ereport(ERROR,
529 : (errcode_for_file_access(),
530 : errmsg("could not write to file \"%s\": %m", versionfile)));
531 : }
532 438 : pgstat_report_wait_end();
533 :
534 : /* Close the version file. */
535 438 : CloseTransientFile(fd);
536 :
537 : /* Critical section done. */
538 438 : if (!isRedo)
539 404 : END_CRIT_SECTION();
540 438 : }
541 :
542 : /*
543 : * Create a new database using the FILE_COPY strategy.
544 : *
545 : * Copy each tablespace at the filesystem level, and log a single WAL record
546 : * for each tablespace copied. This requires a checkpoint before and after the
547 : * copy, which may be expensive, but it does greatly reduce WAL generation
548 : * if the copied database is large.
549 : */
550 : static void
551 134 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
552 : Oid dst_tsid)
553 : {
554 : TableScanDesc scan;
555 : Relation rel;
556 : HeapTuple tuple;
557 :
558 : /*
559 : * Force a checkpoint before starting the copy. This will force all dirty
560 : * buffers, including those of unlogged tables, out to disk, to ensure
561 : * source database is up-to-date on disk for the copy.
562 : * FlushDatabaseBuffers() would suffice for that, but we also want to
563 : * process any pending unlink requests. Otherwise, if a checkpoint
564 : * happened while we're copying files, a file might be deleted just when
565 : * we're about to copy it, causing the lstat() call in copydir() to fail
566 : * with ENOENT.
567 : */
568 134 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
569 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
570 :
571 : /*
572 : * Iterate through all tablespaces of the template database, and copy each
573 : * one to the new database.
574 : */
575 134 : rel = table_open(TableSpaceRelationId, AccessShareLock);
576 134 : scan = table_beginscan_catalog(rel, 0, NULL);
577 438 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
578 : {
579 304 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
580 304 : Oid srctablespace = spaceform->oid;
581 : Oid dsttablespace;
582 : char *srcpath;
583 : char *dstpath;
584 : struct stat st;
585 :
586 : /* No need to copy global tablespace */
587 304 : if (srctablespace == GLOBALTABLESPACE_OID)
588 170 : continue;
589 :
590 170 : srcpath = GetDatabasePath(src_dboid, srctablespace);
591 :
592 304 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
593 134 : directory_is_empty(srcpath))
594 : {
595 : /* Assume we can ignore it */
596 36 : pfree(srcpath);
597 36 : continue;
598 : }
599 :
600 134 : if (srctablespace == src_tsid)
601 134 : dsttablespace = dst_tsid;
602 : else
603 0 : dsttablespace = srctablespace;
604 :
605 134 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
606 :
607 : /*
608 : * Copy this subdirectory to the new location
609 : *
610 : * We don't need to copy subdirectories
611 : */
612 134 : copydir(srcpath, dstpath, false);
613 :
614 : /* Record the filesystem change in XLOG */
615 : {
616 : xl_dbase_create_file_copy_rec xlrec;
617 :
618 134 : xlrec.db_id = dst_dboid;
619 134 : xlrec.tablespace_id = dsttablespace;
620 134 : xlrec.src_db_id = src_dboid;
621 134 : xlrec.src_tablespace_id = srctablespace;
622 :
623 134 : XLogBeginInsert();
624 134 : XLogRegisterData((char *) &xlrec,
625 : sizeof(xl_dbase_create_file_copy_rec));
626 :
627 134 : (void) XLogInsert(RM_DBASE_ID,
628 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
629 : }
630 134 : pfree(srcpath);
631 134 : pfree(dstpath);
632 : }
633 134 : table_endscan(scan);
634 134 : table_close(rel, AccessShareLock);
635 :
636 : /*
637 : * We force a checkpoint before committing. This effectively means that
638 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
639 : * replayed (at least not in ordinary crash recovery; we still have to
640 : * make the XLOG entry for the benefit of PITR operations). This avoids
641 : * two nasty scenarios:
642 : *
643 : * #1: When PITR is off, we don't XLOG the contents of newly created
644 : * indexes; therefore the drop-and-recreate-whole-directory behavior of
645 : * DBASE_CREATE replay would lose such indexes.
646 : *
647 : * #2: Since we have to recopy the source database during DBASE_CREATE
648 : * replay, we run the risk of copying changes in it that were committed
649 : * after the original CREATE DATABASE command but before the system crash
650 : * that led to the replay. This is at least unexpected and at worst could
651 : * lead to inconsistencies, eg duplicate table names.
652 : *
653 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
654 : *
655 : * In PITR replay, the first of these isn't an issue, and the second is
656 : * only a risk if the CREATE DATABASE and subsequent template database
657 : * change both occur while a base backup is being taken. There doesn't
658 : * seem to be much we can do about that except document it as a
659 : * limitation.
660 : *
661 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
662 : * strategy that avoids these problems.
663 : */
664 134 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
665 134 : }
666 :
667 : /*
668 : * CREATE DATABASE
669 : */
670 : Oid
671 562 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
672 : {
673 : Oid src_dboid;
674 : Oid src_owner;
675 562 : int src_encoding = -1;
676 562 : char *src_collate = NULL;
677 562 : char *src_ctype = NULL;
678 562 : char *src_iculocale = NULL;
679 562 : char *src_icurules = NULL;
680 562 : char src_locprovider = '\0';
681 562 : char *src_collversion = NULL;
682 : bool src_istemplate;
683 : bool src_hasloginevt;
684 : bool src_allowconn;
685 562 : TransactionId src_frozenxid = InvalidTransactionId;
686 562 : MultiXactId src_minmxid = InvalidMultiXactId;
687 : Oid src_deftablespace;
688 : volatile Oid dst_deftablespace;
689 : Relation pg_database_rel;
690 : HeapTuple tuple;
691 562 : Datum new_record[Natts_pg_database] = {0};
692 562 : bool new_record_nulls[Natts_pg_database] = {0};
693 562 : Oid dboid = InvalidOid;
694 : Oid datdba;
695 : ListCell *option;
696 562 : DefElem *dtablespacename = NULL;
697 562 : DefElem *downer = NULL;
698 562 : DefElem *dtemplate = NULL;
699 562 : DefElem *dencoding = NULL;
700 562 : DefElem *dlocale = NULL;
701 562 : DefElem *dcollate = NULL;
702 562 : DefElem *dctype = NULL;
703 562 : DefElem *diculocale = NULL;
704 562 : DefElem *dicurules = NULL;
705 562 : DefElem *dlocprovider = NULL;
706 562 : DefElem *distemplate = NULL;
707 562 : DefElem *dallowconnections = NULL;
708 562 : DefElem *dconnlimit = NULL;
709 562 : DefElem *dcollversion = NULL;
710 562 : DefElem *dstrategy = NULL;
711 562 : char *dbname = stmt->dbname;
712 562 : char *dbowner = NULL;
713 562 : const char *dbtemplate = NULL;
714 562 : char *dbcollate = NULL;
715 562 : char *dbctype = NULL;
716 562 : char *dbiculocale = NULL;
717 562 : char *dbicurules = NULL;
718 562 : char dblocprovider = '\0';
719 : char *canonname;
720 562 : int encoding = -1;
721 562 : bool dbistemplate = false;
722 562 : bool dballowconnections = true;
723 562 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
724 562 : char *dbcollversion = NULL;
725 : int notherbackends;
726 : int npreparedxacts;
727 562 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
728 : createdb_failure_params fparms;
729 :
730 : /* Extract options from the statement node tree */
731 1450 : foreach(option, stmt->options)
732 : {
733 888 : DefElem *defel = (DefElem *) lfirst(option);
734 :
735 888 : if (strcmp(defel->defname, "tablespace") == 0)
736 : {
737 16 : if (dtablespacename)
738 0 : errorConflictingDefElem(defel, pstate);
739 16 : dtablespacename = defel;
740 : }
741 872 : else if (strcmp(defel->defname, "owner") == 0)
742 : {
743 2 : if (downer)
744 0 : errorConflictingDefElem(defel, pstate);
745 2 : downer = defel;
746 : }
747 870 : else if (strcmp(defel->defname, "template") == 0)
748 : {
749 250 : if (dtemplate)
750 0 : errorConflictingDefElem(defel, pstate);
751 250 : dtemplate = defel;
752 : }
753 620 : else if (strcmp(defel->defname, "encoding") == 0)
754 : {
755 52 : if (dencoding)
756 0 : errorConflictingDefElem(defel, pstate);
757 52 : dencoding = defel;
758 : }
759 568 : else if (strcmp(defel->defname, "locale") == 0)
760 : {
761 54 : if (dlocale)
762 0 : errorConflictingDefElem(defel, pstate);
763 54 : dlocale = defel;
764 : }
765 514 : else if (strcmp(defel->defname, "lc_collate") == 0)
766 : {
767 6 : if (dcollate)
768 0 : errorConflictingDefElem(defel, pstate);
769 6 : dcollate = defel;
770 : }
771 508 : else if (strcmp(defel->defname, "lc_ctype") == 0)
772 : {
773 6 : if (dctype)
774 0 : errorConflictingDefElem(defel, pstate);
775 6 : dctype = defel;
776 : }
777 502 : else if (strcmp(defel->defname, "icu_locale") == 0)
778 : {
779 20 : if (diculocale)
780 0 : errorConflictingDefElem(defel, pstate);
781 20 : diculocale = defel;
782 : }
783 482 : else if (strcmp(defel->defname, "icu_rules") == 0)
784 : {
785 0 : if (dicurules)
786 0 : errorConflictingDefElem(defel, pstate);
787 0 : dicurules = defel;
788 : }
789 482 : else if (strcmp(defel->defname, "locale_provider") == 0)
790 : {
791 54 : if (dlocprovider)
792 0 : errorConflictingDefElem(defel, pstate);
793 54 : dlocprovider = defel;
794 : }
795 428 : else if (strcmp(defel->defname, "is_template") == 0)
796 : {
797 64 : if (distemplate)
798 0 : errorConflictingDefElem(defel, pstate);
799 64 : distemplate = defel;
800 : }
801 364 : else if (strcmp(defel->defname, "allow_connections") == 0)
802 : {
803 62 : if (dallowconnections)
804 0 : errorConflictingDefElem(defel, pstate);
805 62 : dallowconnections = defel;
806 : }
807 302 : else if (strcmp(defel->defname, "connection_limit") == 0)
808 : {
809 0 : if (dconnlimit)
810 0 : errorConflictingDefElem(defel, pstate);
811 0 : dconnlimit = defel;
812 : }
813 302 : else if (strcmp(defel->defname, "collation_version") == 0)
814 : {
815 12 : if (dcollversion)
816 0 : errorConflictingDefElem(defel, pstate);
817 12 : dcollversion = defel;
818 : }
819 290 : else if (strcmp(defel->defname, "location") == 0)
820 : {
821 0 : ereport(WARNING,
822 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
823 : errmsg("LOCATION is not supported anymore"),
824 : errhint("Consider using tablespaces instead."),
825 : parser_errposition(pstate, defel->location)));
826 : }
827 290 : else if (strcmp(defel->defname, "oid") == 0)
828 : {
829 142 : dboid = defGetObjectId(defel);
830 :
831 : /*
832 : * We don't normally permit new databases to be created with
833 : * system-assigned OIDs. pg_upgrade tries to preserve database
834 : * OIDs, so we can't allow any database to be created with an OID
835 : * that might be in use in a freshly-initialized cluster created
836 : * by some future version. We assume all such OIDs will be from
837 : * the system-managed OID range.
838 : *
839 : * As an exception, however, we permit any OID to be assigned when
840 : * allow_system_table_mods=on (so that initdb can assign system
841 : * OIDs to template0 and postgres) or when performing a binary
842 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
843 : * in the source cluster).
844 : */
845 142 : if (dboid < FirstNormalObjectId &&
846 128 : !allowSystemTableMods && !IsBinaryUpgrade)
847 0 : ereport(ERROR,
848 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
849 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
850 : }
851 148 : else if (strcmp(defel->defname, "strategy") == 0)
852 : {
853 148 : if (dstrategy)
854 0 : errorConflictingDefElem(defel, pstate);
855 148 : dstrategy = defel;
856 : }
857 : else
858 0 : ereport(ERROR,
859 : (errcode(ERRCODE_SYNTAX_ERROR),
860 : errmsg("option \"%s\" not recognized", defel->defname),
861 : parser_errposition(pstate, defel->location)));
862 : }
863 :
864 562 : if (downer && downer->arg)
865 2 : dbowner = defGetString(downer);
866 562 : if (dtemplate && dtemplate->arg)
867 250 : dbtemplate = defGetString(dtemplate);
868 562 : if (dencoding && dencoding->arg)
869 : {
870 : const char *encoding_name;
871 :
872 52 : if (IsA(dencoding->arg, Integer))
873 : {
874 0 : encoding = defGetInt32(dencoding);
875 0 : encoding_name = pg_encoding_to_char(encoding);
876 0 : if (strcmp(encoding_name, "") == 0 ||
877 0 : pg_valid_server_encoding(encoding_name) < 0)
878 0 : ereport(ERROR,
879 : (errcode(ERRCODE_UNDEFINED_OBJECT),
880 : errmsg("%d is not a valid encoding code",
881 : encoding),
882 : parser_errposition(pstate, dencoding->location)));
883 : }
884 : else
885 : {
886 52 : encoding_name = defGetString(dencoding);
887 52 : encoding = pg_valid_server_encoding(encoding_name);
888 52 : if (encoding < 0)
889 0 : ereport(ERROR,
890 : (errcode(ERRCODE_UNDEFINED_OBJECT),
891 : errmsg("%s is not a valid encoding name",
892 : encoding_name),
893 : parser_errposition(pstate, dencoding->location)));
894 : }
895 : }
896 562 : if (dlocale && dlocale->arg)
897 : {
898 54 : dbcollate = defGetString(dlocale);
899 54 : dbctype = defGetString(dlocale);
900 : }
901 562 : if (dcollate && dcollate->arg)
902 6 : dbcollate = defGetString(dcollate);
903 562 : if (dctype && dctype->arg)
904 6 : dbctype = defGetString(dctype);
905 562 : if (diculocale && diculocale->arg)
906 20 : dbiculocale = defGetString(diculocale);
907 562 : if (dicurules && dicurules->arg)
908 0 : dbicurules = defGetString(dicurules);
909 562 : if (dlocprovider && dlocprovider->arg)
910 : {
911 54 : char *locproviderstr = defGetString(dlocprovider);
912 :
913 54 : if (pg_strcasecmp(locproviderstr, "icu") == 0)
914 28 : dblocprovider = COLLPROVIDER_ICU;
915 26 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
916 24 : dblocprovider = COLLPROVIDER_LIBC;
917 : else
918 2 : ereport(ERROR,
919 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
920 : errmsg("unrecognized locale provider: %s",
921 : locproviderstr)));
922 : }
923 560 : if (distemplate && distemplate->arg)
924 64 : dbistemplate = defGetBoolean(distemplate);
925 560 : if (dallowconnections && dallowconnections->arg)
926 62 : dballowconnections = defGetBoolean(dallowconnections);
927 560 : if (dconnlimit && dconnlimit->arg)
928 : {
929 0 : dbconnlimit = defGetInt32(dconnlimit);
930 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
931 0 : ereport(ERROR,
932 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
933 : errmsg("invalid connection limit: %d", dbconnlimit)));
934 : }
935 560 : if (dcollversion)
936 12 : dbcollversion = defGetString(dcollversion);
937 :
938 : /* obtain OID of proposed owner */
939 560 : if (dbowner)
940 2 : datdba = get_role_oid(dbowner, false);
941 : else
942 558 : datdba = GetUserId();
943 :
944 : /*
945 : * To create a database, must have createdb privilege and must be able to
946 : * become the target role (this does not imply that the target role itself
947 : * must have createdb privilege). The latter provision guards against
948 : * "giveaway" attacks. Note that a superuser will always have both of
949 : * these privileges a fortiori.
950 : */
951 560 : if (!have_createdb_privilege())
952 6 : ereport(ERROR,
953 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
954 : errmsg("permission denied to create database")));
955 :
956 554 : check_can_set_role(GetUserId(), datdba);
957 :
958 : /*
959 : * Lookup database (template) to be cloned, and obtain share lock on it.
960 : * ShareLock allows two CREATE DATABASEs to work from the same template
961 : * concurrently, while ensuring no one is busy dropping it in parallel
962 : * (which would be Very Bad since we'd likely get an incomplete copy
963 : * without knowing it). This also prevents any new connections from being
964 : * made to the source until we finish copying it, so we can be sure it
965 : * won't change underneath us.
966 : */
967 554 : if (!dbtemplate)
968 306 : dbtemplate = "template1"; /* Default template database name */
969 :
970 554 : if (!get_db_info(dbtemplate, ShareLock,
971 : &src_dboid, &src_owner, &src_encoding,
972 : &src_istemplate, &src_allowconn, &src_hasloginevt,
973 : &src_frozenxid, &src_minmxid, &src_deftablespace,
974 : &src_collate, &src_ctype, &src_iculocale, &src_icurules, &src_locprovider,
975 : &src_collversion))
976 0 : ereport(ERROR,
977 : (errcode(ERRCODE_UNDEFINED_DATABASE),
978 : errmsg("template database \"%s\" does not exist",
979 : dbtemplate)));
980 :
981 : /*
982 : * If the source database was in the process of being dropped, we can't
983 : * use it as a template.
984 : */
985 554 : if (database_is_invalid_oid(src_dboid))
986 2 : ereport(ERROR,
987 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
988 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
989 : errhint("Use DROP DATABASE to drop invalid databases."));
990 :
991 : /*
992 : * Permission check: to copy a DB that's not marked datistemplate, you
993 : * must be superuser or the owner thereof.
994 : */
995 552 : if (!src_istemplate)
996 : {
997 12 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
998 0 : ereport(ERROR,
999 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1000 : errmsg("permission denied to copy database \"%s\"",
1001 : dbtemplate)));
1002 : }
1003 :
1004 : /* Validate the database creation strategy. */
1005 552 : if (dstrategy && dstrategy->arg)
1006 : {
1007 : char *strategy;
1008 :
1009 148 : strategy = defGetString(dstrategy);
1010 148 : if (strcmp(strategy, "wal_log") == 0)
1011 12 : dbstrategy = CREATEDB_WAL_LOG;
1012 136 : else if (strcmp(strategy, "file_copy") == 0)
1013 134 : dbstrategy = CREATEDB_FILE_COPY;
1014 : else
1015 2 : ereport(ERROR,
1016 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1017 : errmsg("invalid create database strategy \"%s\"", strategy),
1018 : errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
1019 : }
1020 :
1021 : /* If encoding or locales are defaulted, use source's setting */
1022 550 : if (encoding < 0)
1023 498 : encoding = src_encoding;
1024 550 : if (dbcollate == NULL)
1025 494 : dbcollate = src_collate;
1026 550 : if (dbctype == NULL)
1027 494 : dbctype = src_ctype;
1028 550 : if (dblocprovider == '\0')
1029 498 : dblocprovider = src_locprovider;
1030 550 : if (dbiculocale == NULL && dblocprovider == COLLPROVIDER_ICU)
1031 : {
1032 36 : if (dlocale && dlocale->arg)
1033 6 : dbiculocale = defGetString(dlocale);
1034 : else
1035 30 : dbiculocale = src_iculocale;
1036 : }
1037 550 : if (dbicurules == NULL && dblocprovider == COLLPROVIDER_ICU)
1038 56 : dbicurules = src_icurules;
1039 :
1040 : /* Some encodings are client only */
1041 550 : if (!PG_VALID_BE_ENCODING(encoding))
1042 0 : ereport(ERROR,
1043 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1044 : errmsg("invalid server encoding %d", encoding)));
1045 :
1046 : /* Check that the chosen locales are valid, and get canonical spellings */
1047 550 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1048 2 : ereport(ERROR,
1049 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1050 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1051 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1052 548 : dbcollate = canonname;
1053 548 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1054 2 : ereport(ERROR,
1055 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1056 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1057 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1058 546 : dbctype = canonname;
1059 :
1060 546 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1061 :
1062 546 : if (dblocprovider == COLLPROVIDER_ICU)
1063 : {
1064 56 : if (!(is_encoding_supported_by_icu(encoding)))
1065 2 : ereport(ERROR,
1066 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1067 : errmsg("encoding \"%s\" is not supported with ICU provider",
1068 : pg_encoding_to_char(encoding))));
1069 :
1070 : /*
1071 : * This would happen if template0 uses the libc provider but the new
1072 : * database uses icu.
1073 : */
1074 54 : if (!dbiculocale)
1075 2 : ereport(ERROR,
1076 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1078 :
1079 : /*
1080 : * During binary upgrade, or when the locale came from the template
1081 : * database, preserve locale string. Otherwise, canonicalize to a
1082 : * language tag.
1083 : */
1084 52 : if (!IsBinaryUpgrade && dbiculocale != src_iculocale)
1085 : {
1086 14 : char *langtag = icu_language_tag(dbiculocale,
1087 : icu_validation_level);
1088 :
1089 14 : if (langtag && strcmp(dbiculocale, langtag) != 0)
1090 : {
1091 6 : ereport(NOTICE,
1092 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1093 : langtag, dbiculocale)));
1094 :
1095 6 : dbiculocale = langtag;
1096 : }
1097 : }
1098 :
1099 52 : icu_validate_locale(dbiculocale);
1100 : }
1101 : else
1102 : {
1103 490 : if (dbiculocale)
1104 0 : ereport(ERROR,
1105 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1106 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1107 :
1108 490 : if (dbicurules)
1109 0 : ereport(ERROR,
1110 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1111 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1112 : }
1113 :
1114 : /*
1115 : * Check that the new encoding and locale settings match the source
1116 : * database. We insist on this because we simply copy the source data ---
1117 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1118 : * according to the source locale would be wrong.
1119 : *
1120 : * However, we assume that template0 doesn't contain any non-ASCII data
1121 : * nor any indexes that depend on collation or ctype, so template0 can be
1122 : * used as template for creating a database with any encoding or locale.
1123 : */
1124 540 : if (strcmp(dbtemplate, "template0") != 0)
1125 : {
1126 318 : if (encoding != src_encoding)
1127 0 : ereport(ERROR,
1128 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1129 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1130 : pg_encoding_to_char(encoding),
1131 : pg_encoding_to_char(src_encoding)),
1132 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1133 :
1134 318 : if (strcmp(dbcollate, src_collate) != 0)
1135 0 : ereport(ERROR,
1136 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1137 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1138 : dbcollate, src_collate),
1139 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1140 :
1141 318 : if (strcmp(dbctype, src_ctype) != 0)
1142 0 : ereport(ERROR,
1143 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1144 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1145 : dbctype, src_ctype),
1146 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1147 :
1148 318 : if (dblocprovider != src_locprovider)
1149 0 : ereport(ERROR,
1150 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1151 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1152 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1153 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1154 :
1155 318 : if (dblocprovider == COLLPROVIDER_ICU)
1156 : {
1157 : char *val1;
1158 : char *val2;
1159 :
1160 : Assert(dbiculocale);
1161 : Assert(src_iculocale);
1162 24 : if (strcmp(dbiculocale, src_iculocale) != 0)
1163 0 : ereport(ERROR,
1164 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1165 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1166 : dbiculocale, src_iculocale),
1167 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1168 :
1169 24 : val1 = dbicurules;
1170 24 : if (!val1)
1171 24 : val1 = "";
1172 24 : val2 = src_icurules;
1173 24 : if (!val2)
1174 24 : val2 = "";
1175 24 : if (strcmp(val1, val2) != 0)
1176 0 : ereport(ERROR,
1177 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1178 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1179 : val1, val2),
1180 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1181 : }
1182 : }
1183 :
1184 : /*
1185 : * If we got a collation version for the template database, check that it
1186 : * matches the actual OS collation version. Otherwise error; the user
1187 : * needs to fix the template database first. Don't complain if a
1188 : * collation version was specified explicitly as a statement option; that
1189 : * is used by pg_upgrade to reproduce the old state exactly.
1190 : *
1191 : * (If the template database has no collation version, then either the
1192 : * platform/provider does not support collation versioning, or it's
1193 : * template0, for which we stipulate that it does not contain
1194 : * collation-using objects.)
1195 : */
1196 540 : if (src_collversion && !dcollversion)
1197 : {
1198 : char *actual_versionstr;
1199 :
1200 52 : actual_versionstr = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
1201 52 : if (!actual_versionstr)
1202 0 : ereport(ERROR,
1203 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1204 : dbtemplate)));
1205 :
1206 52 : if (strcmp(actual_versionstr, src_collversion) != 0)
1207 0 : ereport(ERROR,
1208 : (errmsg("template database \"%s\" has a collation version mismatch",
1209 : dbtemplate),
1210 : errdetail("The template database was created using collation version %s, "
1211 : "but the operating system provides version %s.",
1212 : src_collversion, actual_versionstr),
1213 : errhint("Rebuild all objects in the template database that use the default collation and run "
1214 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1215 : "or build PostgreSQL with the right library version.",
1216 : quote_identifier(dbtemplate))));
1217 : }
1218 :
1219 540 : if (dbcollversion == NULL)
1220 528 : dbcollversion = src_collversion;
1221 :
1222 : /*
1223 : * Normally, we copy the collation version from the template database.
1224 : * This last resort only applies if the template database does not have a
1225 : * collation version, which is normally only the case for template0.
1226 : */
1227 540 : if (dbcollversion == NULL)
1228 476 : dbcollversion = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
1229 :
1230 : /* Resolve default tablespace for new database */
1231 540 : if (dtablespacename && dtablespacename->arg)
1232 16 : {
1233 : char *tablespacename;
1234 : AclResult aclresult;
1235 :
1236 16 : tablespacename = defGetString(dtablespacename);
1237 16 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1238 : /* check permissions */
1239 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1240 : ACL_CREATE);
1241 16 : if (aclresult != ACLCHECK_OK)
1242 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1243 : tablespacename);
1244 :
1245 : /* pg_global must never be the default tablespace */
1246 16 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1247 0 : ereport(ERROR,
1248 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1249 : errmsg("pg_global cannot be used as default tablespace")));
1250 :
1251 : /*
1252 : * If we are trying to change the default tablespace of the template,
1253 : * we require that the template not have any files in the new default
1254 : * tablespace. This is necessary because otherwise the copied
1255 : * database would contain pg_class rows that refer to its default
1256 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1257 : * would cause problems. For example another CREATE DATABASE using
1258 : * the copied database as template, and trying to change its default
1259 : * tablespace again, would yield outright incorrect results (it would
1260 : * improperly move tables to the new default tablespace that should
1261 : * stay in the same tablespace).
1262 : */
1263 16 : if (dst_deftablespace != src_deftablespace)
1264 : {
1265 : char *srcpath;
1266 : struct stat st;
1267 :
1268 16 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1269 :
1270 16 : if (stat(srcpath, &st) == 0 &&
1271 0 : S_ISDIR(st.st_mode) &&
1272 0 : !directory_is_empty(srcpath))
1273 0 : ereport(ERROR,
1274 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1275 : errmsg("cannot assign new default tablespace \"%s\"",
1276 : tablespacename),
1277 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1278 : dbtemplate)));
1279 16 : pfree(srcpath);
1280 : }
1281 : }
1282 : else
1283 : {
1284 : /* Use template database's default tablespace */
1285 524 : dst_deftablespace = src_deftablespace;
1286 : /* Note there is no additional permission check in this path */
1287 : }
1288 :
1289 : /*
1290 : * If built with appropriate switch, whine when regression-testing
1291 : * conventions for database names are violated. But don't complain during
1292 : * initdb.
1293 : */
1294 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1295 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1296 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1297 : #endif
1298 :
1299 : /*
1300 : * Check for db name conflict. This is just to give a more friendly error
1301 : * message than "unique index violation". There's a race condition but
1302 : * we're willing to accept the less friendly message in that case.
1303 : */
1304 540 : if (OidIsValid(get_database_oid(dbname, true)))
1305 2 : ereport(ERROR,
1306 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1307 : errmsg("database \"%s\" already exists", dbname)));
1308 :
1309 : /*
1310 : * The source DB can't have any active backends, except this one
1311 : * (exception is to allow CREATE DB while connected to template1).
1312 : * Otherwise we might copy inconsistent data.
1313 : *
1314 : * This should be last among the basic error checks, because it involves
1315 : * potential waiting; we may as well throw an error first if we're gonna
1316 : * throw one.
1317 : */
1318 538 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1319 0 : ereport(ERROR,
1320 : (errcode(ERRCODE_OBJECT_IN_USE),
1321 : errmsg("source database \"%s\" is being accessed by other users",
1322 : dbtemplate),
1323 : errdetail_busy_db(notherbackends, npreparedxacts)));
1324 :
1325 : /*
1326 : * Select an OID for the new database, checking that it doesn't have a
1327 : * filename conflict with anything already existing in the tablespace
1328 : * directories.
1329 : */
1330 538 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1331 :
1332 : /*
1333 : * If database OID is configured, check if the OID is already in use or
1334 : * data directory already exists.
1335 : */
1336 538 : if (OidIsValid(dboid))
1337 : {
1338 142 : char *existing_dbname = get_database_name(dboid);
1339 :
1340 142 : if (existing_dbname != NULL)
1341 0 : ereport(ERROR,
1342 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1343 : errmsg("database OID %u is already in use by database \"%s\"",
1344 : dboid, existing_dbname));
1345 :
1346 142 : if (check_db_file_conflict(dboid))
1347 0 : ereport(ERROR,
1348 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1349 : errmsg("data directory with the specified OID %u already exists", dboid));
1350 : }
1351 : else
1352 : {
1353 : /* Select an OID for the new database if is not explicitly configured. */
1354 : do
1355 : {
1356 396 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1357 : Anum_pg_database_oid);
1358 396 : } while (check_db_file_conflict(dboid));
1359 : }
1360 :
1361 : /*
1362 : * Insert a new tuple into pg_database. This establishes our ownership of
1363 : * the new database name (anyone else trying to insert the same name will
1364 : * block on the unique index, and fail after we commit).
1365 : */
1366 :
1367 : Assert((dblocprovider == COLLPROVIDER_ICU && dbiculocale) ||
1368 : (dblocprovider != COLLPROVIDER_ICU && !dbiculocale));
1369 :
1370 : /* Form tuple */
1371 538 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1372 538 : new_record[Anum_pg_database_datname - 1] =
1373 538 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1374 538 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1375 538 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1376 538 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1377 538 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1378 538 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1379 538 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1380 538 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1381 538 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1382 538 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1383 538 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1384 538 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1385 538 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1386 538 : if (dbiculocale)
1387 50 : new_record[Anum_pg_database_daticulocale - 1] = CStringGetTextDatum(dbiculocale);
1388 : else
1389 488 : new_record_nulls[Anum_pg_database_daticulocale - 1] = true;
1390 538 : if (dbicurules)
1391 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1392 : else
1393 538 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1394 538 : if (dbcollversion)
1395 108 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1396 : else
1397 430 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1398 :
1399 : /*
1400 : * We deliberately set datacl to default (NULL), rather than copying it
1401 : * from the template database. Copying it would be a bad idea when the
1402 : * owner is not the same as the template's owner.
1403 : */
1404 538 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1405 :
1406 538 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1407 : new_record, new_record_nulls);
1408 :
1409 538 : CatalogTupleInsert(pg_database_rel, tuple);
1410 :
1411 : /*
1412 : * Now generate additional catalog entries associated with the new DB
1413 : */
1414 :
1415 : /* Register owner dependency */
1416 538 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1417 :
1418 : /* Create pg_shdepend entries for objects within database */
1419 538 : copyTemplateDependencies(src_dboid, dboid);
1420 :
1421 : /* Post creation hook for new database */
1422 538 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1423 :
1424 : /*
1425 : * If we're going to be reading data for the to-be-created database into
1426 : * shared_buffers, take a lock on it. Nobody should know that this
1427 : * database exists yet, but it's good to maintain the invariant that an
1428 : * AccessExclusiveLock on the database is sufficient to drop all of its
1429 : * buffers without worrying about more being read later.
1430 : *
1431 : * Note that we need to do this before entering the
1432 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1433 : * expects this lock to be held already.
1434 : */
1435 538 : if (dbstrategy == CREATEDB_WAL_LOG)
1436 404 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1437 :
1438 : /*
1439 : * Once we start copying subdirectories, we need to be able to clean 'em
1440 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1441 : * is not a 100% solution, because of the possibility of failure during
1442 : * transaction commit after we leave this routine, but it should handle
1443 : * most scenarios.)
1444 : */
1445 538 : fparms.src_dboid = src_dboid;
1446 538 : fparms.dest_dboid = dboid;
1447 538 : fparms.strategy = dbstrategy;
1448 :
1449 538 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1450 : PointerGetDatum(&fparms));
1451 : {
1452 : /*
1453 : * If the user has asked to create a database with WAL_LOG strategy
1454 : * then call CreateDatabaseUsingWalLog, which will copy the database
1455 : * at the block level and it will WAL log each copied block.
1456 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1457 : * database file by file.
1458 : */
1459 538 : if (dbstrategy == CREATEDB_WAL_LOG)
1460 404 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1461 : dst_deftablespace);
1462 : else
1463 134 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1464 : dst_deftablespace);
1465 :
1466 : /*
1467 : * Close pg_database, but keep lock till commit.
1468 : */
1469 538 : table_close(pg_database_rel, NoLock);
1470 :
1471 : /*
1472 : * Force synchronous commit, thus minimizing the window between
1473 : * creation of the database files and committal of the transaction. If
1474 : * we crash before committing, we'll have a DB that's taking up disk
1475 : * space but is not in pg_database, which is not good.
1476 : */
1477 538 : ForceSyncCommit();
1478 : }
1479 538 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1480 : PointerGetDatum(&fparms));
1481 :
1482 538 : return dboid;
1483 : }
1484 :
1485 : /*
1486 : * Check whether chosen encoding matches chosen locale settings. This
1487 : * restriction is necessary because libc's locale-specific code usually
1488 : * fails when presented with data in an encoding it's not expecting. We
1489 : * allow mismatch in four cases:
1490 : *
1491 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1492 : * which works with any encoding.
1493 : *
1494 : * 2. locale encoding = -1, which means that we couldn't determine the
1495 : * locale's encoding and have to trust the user to get it right.
1496 : *
1497 : * 3. selected encoding is UTF8 and platform is win32. This is because
1498 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1499 : * converted to UTF16 before being used.
1500 : *
1501 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1502 : * is risky but we have historically allowed it --- notably, the
1503 : * regression tests require it.
1504 : *
1505 : * Note: if you change this policy, fix initdb to match.
1506 : */
1507 : void
1508 574 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1509 : {
1510 574 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1511 574 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1512 :
1513 578 : if (!(ctype_encoding == encoding ||
1514 4 : ctype_encoding == PG_SQL_ASCII ||
1515 : ctype_encoding == -1 ||
1516 : #ifdef WIN32
1517 : encoding == PG_UTF8 ||
1518 : #endif
1519 4 : (encoding == PG_SQL_ASCII && superuser())))
1520 0 : ereport(ERROR,
1521 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1522 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1523 : pg_encoding_to_char(encoding),
1524 : ctype),
1525 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1526 : pg_encoding_to_char(ctype_encoding))));
1527 :
1528 578 : if (!(collate_encoding == encoding ||
1529 4 : collate_encoding == PG_SQL_ASCII ||
1530 : collate_encoding == -1 ||
1531 : #ifdef WIN32
1532 : encoding == PG_UTF8 ||
1533 : #endif
1534 4 : (encoding == PG_SQL_ASCII && superuser())))
1535 0 : ereport(ERROR,
1536 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1537 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1538 : pg_encoding_to_char(encoding),
1539 : collate),
1540 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1541 : pg_encoding_to_char(collate_encoding))));
1542 574 : }
1543 :
1544 : /* Error cleanup callback for createdb */
1545 : static void
1546 0 : createdb_failure_callback(int code, Datum arg)
1547 : {
1548 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1549 :
1550 : /*
1551 : * If we were copying database at block levels then drop pages for the
1552 : * destination database that are in the shared buffer cache. And tell
1553 : * checkpointer to forget any pending fsync and unlink requests for files
1554 : * in the database. The reasoning behind doing this is same as explained
1555 : * in dropdb function. But unlike dropdb we don't need to call
1556 : * pgstat_drop_database because this database is still not created so
1557 : * there should not be any stat for this.
1558 : */
1559 0 : if (fparms->strategy == CREATEDB_WAL_LOG)
1560 : {
1561 0 : DropDatabaseBuffers(fparms->dest_dboid);
1562 0 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1563 :
1564 : /* Release lock on the target database. */
1565 0 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1566 : AccessShareLock);
1567 : }
1568 :
1569 : /*
1570 : * Release lock on source database before doing recursive remove. This is
1571 : * not essential but it seems desirable to release the lock as soon as
1572 : * possible.
1573 : */
1574 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1575 :
1576 : /* Throw away any successfully copied subdirectories */
1577 0 : remove_dbtablespaces(fparms->dest_dboid);
1578 0 : }
1579 :
1580 :
1581 : /*
1582 : * DROP DATABASE
1583 : */
1584 : void
1585 86 : dropdb(const char *dbname, bool missing_ok, bool force)
1586 : {
1587 : Oid db_id;
1588 : bool db_istemplate;
1589 : Relation pgdbrel;
1590 : HeapTuple tup;
1591 : Form_pg_database datform;
1592 : int notherbackends;
1593 : int npreparedxacts;
1594 : int nslots,
1595 : nslots_active;
1596 : int nsubscriptions;
1597 :
1598 : /*
1599 : * Look up the target database's OID, and get exclusive lock on it. We
1600 : * need this to ensure that no new backend starts up in the target
1601 : * database while we are deleting it (see postinit.c), and that no one is
1602 : * using it as a CREATE DATABASE template or trying to delete it for
1603 : * themselves.
1604 : */
1605 86 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1606 :
1607 86 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1608 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1609 : {
1610 32 : if (!missing_ok)
1611 : {
1612 16 : ereport(ERROR,
1613 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1614 : errmsg("database \"%s\" does not exist", dbname)));
1615 : }
1616 : else
1617 : {
1618 : /* Close pg_database, release the lock, since we changed nothing */
1619 16 : table_close(pgdbrel, RowExclusiveLock);
1620 16 : ereport(NOTICE,
1621 : (errmsg("database \"%s\" does not exist, skipping",
1622 : dbname)));
1623 16 : return;
1624 : }
1625 : }
1626 :
1627 : /*
1628 : * Permission checks
1629 : */
1630 54 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1631 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1632 : dbname);
1633 :
1634 : /* DROP hook for the database being removed */
1635 54 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1636 :
1637 : /*
1638 : * Disallow dropping a DB that is marked istemplate. This is just to
1639 : * prevent people from accidentally dropping template0 or template1; they
1640 : * can do so if they're really determined ...
1641 : */
1642 54 : if (db_istemplate)
1643 0 : ereport(ERROR,
1644 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1645 : errmsg("cannot drop a template database")));
1646 :
1647 : /* Obviously can't drop my own database */
1648 54 : if (db_id == MyDatabaseId)
1649 0 : ereport(ERROR,
1650 : (errcode(ERRCODE_OBJECT_IN_USE),
1651 : errmsg("cannot drop the currently open database")));
1652 :
1653 : /*
1654 : * Check whether there are active logical slots that refer to the
1655 : * to-be-dropped database. The database lock we are holding prevents the
1656 : * creation of new slots using the database or existing slots becoming
1657 : * active.
1658 : */
1659 54 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1660 54 : if (nslots_active)
1661 : {
1662 2 : ereport(ERROR,
1663 : (errcode(ERRCODE_OBJECT_IN_USE),
1664 : errmsg("database \"%s\" is used by an active logical replication slot",
1665 : dbname),
1666 : errdetail_plural("There is %d active slot.",
1667 : "There are %d active slots.",
1668 : nslots_active, nslots_active)));
1669 : }
1670 :
1671 : /*
1672 : * Check if there are subscriptions defined in the target database.
1673 : *
1674 : * We can't drop them automatically because they might be holding
1675 : * resources in other databases/instances.
1676 : */
1677 52 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1678 0 : ereport(ERROR,
1679 : (errcode(ERRCODE_OBJECT_IN_USE),
1680 : errmsg("database \"%s\" is being used by logical replication subscription",
1681 : dbname),
1682 : errdetail_plural("There is %d subscription.",
1683 : "There are %d subscriptions.",
1684 : nsubscriptions, nsubscriptions)));
1685 :
1686 :
1687 : /*
1688 : * Attempt to terminate all existing connections to the target database if
1689 : * the user has requested to do so.
1690 : */
1691 52 : if (force)
1692 2 : TerminateOtherDBBackends(db_id);
1693 :
1694 : /*
1695 : * Check for other backends in the target database. (Because we hold the
1696 : * database lock, no new ones can start after this.)
1697 : *
1698 : * As in CREATE DATABASE, check this after other error conditions.
1699 : */
1700 52 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1701 0 : ereport(ERROR,
1702 : (errcode(ERRCODE_OBJECT_IN_USE),
1703 : errmsg("database \"%s\" is being accessed by other users",
1704 : dbname),
1705 : errdetail_busy_db(notherbackends, npreparedxacts)));
1706 :
1707 : /*
1708 : * Delete any comments or security labels associated with the database.
1709 : */
1710 52 : DeleteSharedComments(db_id, DatabaseRelationId);
1711 52 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1712 :
1713 : /*
1714 : * Remove settings associated with this database
1715 : */
1716 52 : DropSetting(db_id, InvalidOid);
1717 :
1718 : /*
1719 : * Remove shared dependency references for the database.
1720 : */
1721 52 : dropDatabaseDependencies(db_id);
1722 :
1723 : /*
1724 : * Tell the cumulative stats system to forget it immediately, too.
1725 : */
1726 52 : pgstat_drop_database(db_id);
1727 :
1728 52 : tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1729 52 : if (!HeapTupleIsValid(tup))
1730 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1731 52 : datform = (Form_pg_database) GETSTRUCT(tup);
1732 :
1733 : /*
1734 : * Except for the deletion of the catalog row, subsequent actions are not
1735 : * transactional (consider DropDatabaseBuffers() discarding modified
1736 : * buffers). But we might crash or get interrupted below. To prevent
1737 : * accesses to a database with invalid contents, mark the database as
1738 : * invalid using an in-place update.
1739 : *
1740 : * We need to flush the WAL before continuing, to guarantee the
1741 : * modification is durable before performing irreversible filesystem
1742 : * operations.
1743 : */
1744 52 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1745 52 : heap_inplace_update(pgdbrel, tup);
1746 52 : XLogFlush(XactLastRecEnd);
1747 :
1748 : /*
1749 : * Also delete the tuple - transactionally. If this transaction commits,
1750 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1751 : */
1752 52 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1753 :
1754 : /*
1755 : * Drop db-specific replication slots.
1756 : */
1757 52 : ReplicationSlotsDropDBSlots(db_id);
1758 :
1759 : /*
1760 : * Drop pages for this database that are in the shared buffer cache. This
1761 : * is important to ensure that no remaining backend tries to write out a
1762 : * dirty buffer to the dead database later...
1763 : */
1764 52 : DropDatabaseBuffers(db_id);
1765 :
1766 : /*
1767 : * Tell checkpointer to forget any pending fsync and unlink requests for
1768 : * files in the database; else the fsyncs will fail at next checkpoint, or
1769 : * worse, it will delete files that belong to a newly created database
1770 : * with the same OID.
1771 : */
1772 52 : ForgetDatabaseSyncRequests(db_id);
1773 :
1774 : /*
1775 : * Force a checkpoint to make sure the checkpointer has received the
1776 : * message sent by ForgetDatabaseSyncRequests.
1777 : */
1778 52 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1779 :
1780 : /* Close all smgr fds in all backends. */
1781 52 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1782 :
1783 : /*
1784 : * Remove all tablespace subdirs belonging to the database.
1785 : */
1786 52 : remove_dbtablespaces(db_id);
1787 :
1788 : /*
1789 : * Close pg_database, but keep lock till commit.
1790 : */
1791 52 : table_close(pgdbrel, NoLock);
1792 :
1793 : /*
1794 : * Force synchronous commit, thus minimizing the window between removal of
1795 : * the database files and committal of the transaction. If we crash before
1796 : * committing, we'll have a DB that's gone on disk but still there
1797 : * according to pg_database, which is not good.
1798 : */
1799 52 : ForceSyncCommit();
1800 : }
1801 :
1802 :
1803 : /*
1804 : * Rename database
1805 : */
1806 : ObjectAddress
1807 0 : RenameDatabase(const char *oldname, const char *newname)
1808 : {
1809 : Oid db_id;
1810 : HeapTuple newtup;
1811 : Relation rel;
1812 : int notherbackends;
1813 : int npreparedxacts;
1814 : ObjectAddress address;
1815 :
1816 : /*
1817 : * Look up the target database's OID, and get exclusive lock on it. We
1818 : * need this for the same reasons as DROP DATABASE.
1819 : */
1820 0 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1821 :
1822 0 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1823 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1824 0 : ereport(ERROR,
1825 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1826 : errmsg("database \"%s\" does not exist", oldname)));
1827 :
1828 : /* must be owner */
1829 0 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1830 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1831 : oldname);
1832 :
1833 : /* must have createdb rights */
1834 0 : if (!have_createdb_privilege())
1835 0 : ereport(ERROR,
1836 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1837 : errmsg("permission denied to rename database")));
1838 :
1839 : /*
1840 : * If built with appropriate switch, whine when regression-testing
1841 : * conventions for database names are violated.
1842 : */
1843 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1844 : if (strstr(newname, "regression") == NULL)
1845 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1846 : #endif
1847 :
1848 : /*
1849 : * Make sure the new name doesn't exist. See notes for same error in
1850 : * CREATE DATABASE.
1851 : */
1852 0 : if (OidIsValid(get_database_oid(newname, true)))
1853 0 : ereport(ERROR,
1854 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1855 : errmsg("database \"%s\" already exists", newname)));
1856 :
1857 : /*
1858 : * XXX Client applications probably store the current database somewhere,
1859 : * so renaming it could cause confusion. On the other hand, there may not
1860 : * be an actual problem besides a little confusion, so think about this
1861 : * and decide.
1862 : */
1863 0 : if (db_id == MyDatabaseId)
1864 0 : ereport(ERROR,
1865 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1866 : errmsg("current database cannot be renamed")));
1867 :
1868 : /*
1869 : * Make sure the database does not have active sessions. This is the same
1870 : * concern as above, but applied to other sessions.
1871 : *
1872 : * As in CREATE DATABASE, check this after other error conditions.
1873 : */
1874 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1875 0 : ereport(ERROR,
1876 : (errcode(ERRCODE_OBJECT_IN_USE),
1877 : errmsg("database \"%s\" is being accessed by other users",
1878 : oldname),
1879 : errdetail_busy_db(notherbackends, npreparedxacts)));
1880 :
1881 : /* rename */
1882 0 : newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1883 0 : if (!HeapTupleIsValid(newtup))
1884 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1885 0 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1886 0 : CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1887 :
1888 0 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1889 :
1890 0 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1891 :
1892 : /*
1893 : * Close pg_database, but keep lock till commit.
1894 : */
1895 0 : table_close(rel, NoLock);
1896 :
1897 0 : return address;
1898 : }
1899 :
1900 :
1901 : /*
1902 : * ALTER DATABASE SET TABLESPACE
1903 : */
1904 : static void
1905 10 : movedb(const char *dbname, const char *tblspcname)
1906 : {
1907 : Oid db_id;
1908 : Relation pgdbrel;
1909 : int notherbackends;
1910 : int npreparedxacts;
1911 : HeapTuple oldtuple,
1912 : newtuple;
1913 : Oid src_tblspcoid,
1914 : dst_tblspcoid;
1915 : ScanKeyData scankey;
1916 : SysScanDesc sysscan;
1917 : AclResult aclresult;
1918 : char *src_dbpath;
1919 : char *dst_dbpath;
1920 : DIR *dstdir;
1921 : struct dirent *xlde;
1922 : movedb_failure_params fparms;
1923 :
1924 : /*
1925 : * Look up the target database's OID, and get exclusive lock on it. We
1926 : * need this to ensure that no new backend starts up in the database while
1927 : * we are moving it, and that no one is using it as a CREATE DATABASE
1928 : * template or trying to delete it.
1929 : */
1930 10 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1931 :
1932 10 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1933 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
1934 0 : ereport(ERROR,
1935 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1936 : errmsg("database \"%s\" does not exist", dbname)));
1937 :
1938 : /*
1939 : * We actually need a session lock, so that the lock will persist across
1940 : * the commit/restart below. (We could almost get away with letting the
1941 : * lock be released at commit, except that someone could try to move
1942 : * relations of the DB back into the old directory while we rmtree() it.)
1943 : */
1944 10 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1945 : AccessExclusiveLock);
1946 :
1947 : /*
1948 : * Permission checks
1949 : */
1950 10 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1951 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1952 : dbname);
1953 :
1954 : /*
1955 : * Obviously can't move the tables of my own database
1956 : */
1957 10 : if (db_id == MyDatabaseId)
1958 0 : ereport(ERROR,
1959 : (errcode(ERRCODE_OBJECT_IN_USE),
1960 : errmsg("cannot change the tablespace of the currently open database")));
1961 :
1962 : /*
1963 : * Get tablespace's oid
1964 : */
1965 10 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1966 :
1967 : /*
1968 : * Permission checks
1969 : */
1970 10 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
1971 : ACL_CREATE);
1972 10 : if (aclresult != ACLCHECK_OK)
1973 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1974 : tblspcname);
1975 :
1976 : /*
1977 : * pg_global must never be the default tablespace
1978 : */
1979 10 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1980 0 : ereport(ERROR,
1981 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1982 : errmsg("pg_global cannot be used as default tablespace")));
1983 :
1984 : /*
1985 : * No-op if same tablespace
1986 : */
1987 10 : if (src_tblspcoid == dst_tblspcoid)
1988 : {
1989 0 : table_close(pgdbrel, NoLock);
1990 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1991 : AccessExclusiveLock);
1992 0 : return;
1993 : }
1994 :
1995 : /*
1996 : * Check for other backends in the target database. (Because we hold the
1997 : * database lock, no new ones can start after this.)
1998 : *
1999 : * As in CREATE DATABASE, check this after other error conditions.
2000 : */
2001 10 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2002 0 : ereport(ERROR,
2003 : (errcode(ERRCODE_OBJECT_IN_USE),
2004 : errmsg("database \"%s\" is being accessed by other users",
2005 : dbname),
2006 : errdetail_busy_db(notherbackends, npreparedxacts)));
2007 :
2008 : /*
2009 : * Get old and new database paths
2010 : */
2011 10 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2012 10 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2013 :
2014 : /*
2015 : * Force a checkpoint before proceeding. This will force all dirty
2016 : * buffers, including those of unlogged tables, out to disk, to ensure
2017 : * source database is up-to-date on disk for the copy.
2018 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2019 : * process any pending unlink requests. Otherwise, the check for existing
2020 : * files in the target directory might fail unnecessarily, not to mention
2021 : * that the copy might fail due to source files getting deleted under it.
2022 : * On Windows, this also ensures that background procs don't hold any open
2023 : * files, which would cause rmdir() to fail.
2024 : */
2025 10 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2026 : | CHECKPOINT_FLUSH_ALL);
2027 :
2028 : /* Close all smgr fds in all backends. */
2029 10 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2030 :
2031 : /*
2032 : * Now drop all buffers holding data of the target database; they should
2033 : * no longer be dirty so DropDatabaseBuffers is safe.
2034 : *
2035 : * It might seem that we could just let these buffers age out of shared
2036 : * buffers naturally, since they should not get referenced anymore. The
2037 : * problem with that is that if the user later moves the database back to
2038 : * its original tablespace, any still-surviving buffers would appear to
2039 : * contain valid data again --- but they'd be missing any changes made in
2040 : * the database while it was in the new tablespace. In any case, freeing
2041 : * buffers that should never be used again seems worth the cycles.
2042 : *
2043 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2044 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2045 : */
2046 10 : DropDatabaseBuffers(db_id);
2047 :
2048 : /*
2049 : * Check for existence of files in the target directory, i.e., objects of
2050 : * this database that are already in the target tablespace. We can't
2051 : * allow the move in such a case, because we would need to change those
2052 : * relations' pg_class.reltablespace entries to zero, and we don't have
2053 : * access to the DB's pg_class to do so.
2054 : */
2055 10 : dstdir = AllocateDir(dst_dbpath);
2056 10 : if (dstdir != NULL)
2057 : {
2058 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2059 : {
2060 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2061 0 : strcmp(xlde->d_name, "..") == 0)
2062 0 : continue;
2063 :
2064 0 : ereport(ERROR,
2065 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2066 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2067 : dbname, tblspcname),
2068 : errhint("You must move them back to the database's default tablespace before using this command.")));
2069 : }
2070 :
2071 0 : FreeDir(dstdir);
2072 :
2073 : /*
2074 : * The directory exists but is empty. We must remove it before using
2075 : * the copydir function.
2076 : */
2077 0 : if (rmdir(dst_dbpath) != 0)
2078 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2079 : dst_dbpath);
2080 : }
2081 :
2082 : /*
2083 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2084 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2085 : * of the possibility of failure during transaction commit, but it should
2086 : * handle most scenarios.
2087 : */
2088 10 : fparms.dest_dboid = db_id;
2089 10 : fparms.dest_tsoid = dst_tblspcoid;
2090 10 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2091 : PointerGetDatum(&fparms));
2092 : {
2093 10 : Datum new_record[Natts_pg_database] = {0};
2094 10 : bool new_record_nulls[Natts_pg_database] = {0};
2095 10 : bool new_record_repl[Natts_pg_database] = {0};
2096 :
2097 : /*
2098 : * Copy files from the old tablespace to the new one
2099 : */
2100 10 : copydir(src_dbpath, dst_dbpath, false);
2101 :
2102 : /*
2103 : * Record the filesystem change in XLOG
2104 : */
2105 : {
2106 : xl_dbase_create_file_copy_rec xlrec;
2107 :
2108 10 : xlrec.db_id = db_id;
2109 10 : xlrec.tablespace_id = dst_tblspcoid;
2110 10 : xlrec.src_db_id = db_id;
2111 10 : xlrec.src_tablespace_id = src_tblspcoid;
2112 :
2113 10 : XLogBeginInsert();
2114 10 : XLogRegisterData((char *) &xlrec,
2115 : sizeof(xl_dbase_create_file_copy_rec));
2116 :
2117 10 : (void) XLogInsert(RM_DBASE_ID,
2118 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2119 : }
2120 :
2121 : /*
2122 : * Update the database's pg_database tuple
2123 : */
2124 10 : ScanKeyInit(&scankey,
2125 : Anum_pg_database_datname,
2126 : BTEqualStrategyNumber, F_NAMEEQ,
2127 : CStringGetDatum(dbname));
2128 10 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2129 : NULL, 1, &scankey);
2130 10 : oldtuple = systable_getnext(sysscan);
2131 10 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2132 0 : ereport(ERROR,
2133 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2134 : errmsg("database \"%s\" does not exist", dbname)));
2135 :
2136 10 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2137 10 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2138 :
2139 10 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2140 : new_record,
2141 : new_record_nulls, new_record_repl);
2142 10 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2143 :
2144 10 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2145 :
2146 10 : systable_endscan(sysscan);
2147 :
2148 : /*
2149 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2150 : * ensure that we don't have to replay a committed
2151 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2152 : * any unlogged operations done in the new DB tablespace before the
2153 : * next checkpoint.
2154 : */
2155 10 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2156 :
2157 : /*
2158 : * Force synchronous commit, thus minimizing the window between
2159 : * copying the database files and committal of the transaction. If we
2160 : * crash before committing, we'll leave an orphaned set of files on
2161 : * disk, which is not fatal but not good either.
2162 : */
2163 10 : ForceSyncCommit();
2164 :
2165 : /*
2166 : * Close pg_database, but keep lock till commit.
2167 : */
2168 10 : table_close(pgdbrel, NoLock);
2169 : }
2170 10 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2171 : PointerGetDatum(&fparms));
2172 :
2173 : /*
2174 : * Commit the transaction so that the pg_database update is committed. If
2175 : * we crash while removing files, the database won't be corrupt, we'll
2176 : * just leave some orphaned files in the old directory.
2177 : *
2178 : * (This is OK because we know we aren't inside a transaction block.)
2179 : *
2180 : * XXX would it be safe/better to do this inside the ensure block? Not
2181 : * convinced it's a good idea; consider elog just after the transaction
2182 : * really commits.
2183 : */
2184 10 : PopActiveSnapshot();
2185 10 : CommitTransactionCommand();
2186 :
2187 : /* Start new transaction for the remaining work; don't need a snapshot */
2188 10 : StartTransactionCommand();
2189 :
2190 : /*
2191 : * Remove files from the old tablespace
2192 : */
2193 10 : if (!rmtree(src_dbpath, true))
2194 0 : ereport(WARNING,
2195 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2196 : src_dbpath)));
2197 :
2198 : /*
2199 : * Record the filesystem change in XLOG
2200 : */
2201 : {
2202 : xl_dbase_drop_rec xlrec;
2203 :
2204 10 : xlrec.db_id = db_id;
2205 10 : xlrec.ntablespaces = 1;
2206 :
2207 10 : XLogBeginInsert();
2208 10 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
2209 10 : XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
2210 :
2211 10 : (void) XLogInsert(RM_DBASE_ID,
2212 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2213 : }
2214 :
2215 : /* Now it's safe to release the database lock */
2216 10 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2217 : AccessExclusiveLock);
2218 :
2219 10 : pfree(src_dbpath);
2220 10 : pfree(dst_dbpath);
2221 : }
2222 :
2223 : /* Error cleanup callback for movedb */
2224 : static void
2225 0 : movedb_failure_callback(int code, Datum arg)
2226 : {
2227 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2228 : char *dstpath;
2229 :
2230 : /* Get rid of anything we managed to copy to the target directory */
2231 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2232 :
2233 0 : (void) rmtree(dstpath, true);
2234 :
2235 0 : pfree(dstpath);
2236 0 : }
2237 :
2238 : /*
2239 : * Process options and call dropdb function.
2240 : */
2241 : void
2242 86 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2243 : {
2244 86 : bool force = false;
2245 : ListCell *lc;
2246 :
2247 112 : foreach(lc, stmt->options)
2248 : {
2249 26 : DefElem *opt = (DefElem *) lfirst(lc);
2250 :
2251 26 : if (strcmp(opt->defname, "force") == 0)
2252 26 : force = true;
2253 : else
2254 0 : ereport(ERROR,
2255 : (errcode(ERRCODE_SYNTAX_ERROR),
2256 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2257 : parser_errposition(pstate, opt->location)));
2258 : }
2259 :
2260 86 : dropdb(stmt->dbname, stmt->missing_ok, force);
2261 68 : }
2262 :
2263 : /*
2264 : * ALTER DATABASE name ...
2265 : */
2266 : Oid
2267 28 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2268 : {
2269 : Relation rel;
2270 : Oid dboid;
2271 : HeapTuple tuple,
2272 : newtuple;
2273 : Form_pg_database datform;
2274 : ScanKeyData scankey;
2275 : SysScanDesc scan;
2276 : ListCell *option;
2277 28 : bool dbistemplate = false;
2278 28 : bool dballowconnections = true;
2279 28 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2280 28 : DefElem *distemplate = NULL;
2281 28 : DefElem *dallowconnections = NULL;
2282 28 : DefElem *dconnlimit = NULL;
2283 28 : DefElem *dtablespace = NULL;
2284 28 : Datum new_record[Natts_pg_database] = {0};
2285 28 : bool new_record_nulls[Natts_pg_database] = {0};
2286 28 : bool new_record_repl[Natts_pg_database] = {0};
2287 :
2288 : /* Extract options from the statement node tree */
2289 56 : foreach(option, stmt->options)
2290 : {
2291 28 : DefElem *defel = (DefElem *) lfirst(option);
2292 :
2293 28 : if (strcmp(defel->defname, "is_template") == 0)
2294 : {
2295 8 : if (distemplate)
2296 0 : errorConflictingDefElem(defel, pstate);
2297 8 : distemplate = defel;
2298 : }
2299 20 : else if (strcmp(defel->defname, "allow_connections") == 0)
2300 : {
2301 8 : if (dallowconnections)
2302 0 : errorConflictingDefElem(defel, pstate);
2303 8 : dallowconnections = defel;
2304 : }
2305 12 : else if (strcmp(defel->defname, "connection_limit") == 0)
2306 : {
2307 2 : if (dconnlimit)
2308 0 : errorConflictingDefElem(defel, pstate);
2309 2 : dconnlimit = defel;
2310 : }
2311 10 : else if (strcmp(defel->defname, "tablespace") == 0)
2312 : {
2313 10 : if (dtablespace)
2314 0 : errorConflictingDefElem(defel, pstate);
2315 10 : dtablespace = defel;
2316 : }
2317 : else
2318 0 : ereport(ERROR,
2319 : (errcode(ERRCODE_SYNTAX_ERROR),
2320 : errmsg("option \"%s\" not recognized", defel->defname),
2321 : parser_errposition(pstate, defel->location)));
2322 : }
2323 :
2324 28 : if (dtablespace)
2325 : {
2326 : /*
2327 : * While the SET TABLESPACE syntax doesn't allow any other options,
2328 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2329 : * options from being specified in that case.
2330 : */
2331 10 : if (list_length(stmt->options) != 1)
2332 0 : ereport(ERROR,
2333 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2334 : errmsg("option \"%s\" cannot be specified with other options",
2335 : dtablespace->defname),
2336 : parser_errposition(pstate, dtablespace->location)));
2337 : /* this case isn't allowed within a transaction block */
2338 10 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2339 10 : movedb(stmt->dbname, defGetString(dtablespace));
2340 10 : return InvalidOid;
2341 : }
2342 :
2343 18 : if (distemplate && distemplate->arg)
2344 8 : dbistemplate = defGetBoolean(distemplate);
2345 18 : if (dallowconnections && dallowconnections->arg)
2346 8 : dballowconnections = defGetBoolean(dallowconnections);
2347 18 : if (dconnlimit && dconnlimit->arg)
2348 : {
2349 2 : dbconnlimit = defGetInt32(dconnlimit);
2350 2 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2351 0 : ereport(ERROR,
2352 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2353 : errmsg("invalid connection limit: %d", dbconnlimit)));
2354 : }
2355 :
2356 : /*
2357 : * Get the old tuple. We don't need a lock on the database per se,
2358 : * because we're not going to do anything that would mess up incoming
2359 : * connections.
2360 : */
2361 18 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2362 18 : ScanKeyInit(&scankey,
2363 : Anum_pg_database_datname,
2364 : BTEqualStrategyNumber, F_NAMEEQ,
2365 18 : CStringGetDatum(stmt->dbname));
2366 18 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2367 : NULL, 1, &scankey);
2368 18 : tuple = systable_getnext(scan);
2369 18 : if (!HeapTupleIsValid(tuple))
2370 0 : ereport(ERROR,
2371 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2372 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2373 :
2374 18 : datform = (Form_pg_database) GETSTRUCT(tuple);
2375 18 : dboid = datform->oid;
2376 :
2377 18 : if (database_is_invalid_form(datform))
2378 : {
2379 2 : ereport(FATAL,
2380 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2381 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2382 : errhint("Use DROP DATABASE to drop invalid databases."));
2383 : }
2384 :
2385 16 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2386 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2387 0 : stmt->dbname);
2388 :
2389 : /*
2390 : * In order to avoid getting locked out and having to go through
2391 : * standalone mode, we refuse to disallow connections to the database
2392 : * we're currently connected to. Lockout can still happen with concurrent
2393 : * sessions but the likeliness of that is not high enough to worry about.
2394 : */
2395 16 : if (!dballowconnections && dboid == MyDatabaseId)
2396 0 : ereport(ERROR,
2397 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2398 : errmsg("cannot disallow connections for current database")));
2399 :
2400 : /*
2401 : * Build an updated tuple, perusing the information just obtained
2402 : */
2403 16 : if (distemplate)
2404 : {
2405 8 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2406 8 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2407 : }
2408 16 : if (dallowconnections)
2409 : {
2410 8 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2411 8 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2412 : }
2413 16 : if (dconnlimit)
2414 : {
2415 0 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2416 0 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2417 : }
2418 :
2419 16 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2420 : new_record_nulls, new_record_repl);
2421 16 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2422 :
2423 16 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2424 :
2425 16 : systable_endscan(scan);
2426 :
2427 : /* Close pg_database, but keep lock till commit */
2428 16 : table_close(rel, NoLock);
2429 :
2430 16 : return dboid;
2431 : }
2432 :
2433 :
2434 : /*
2435 : * ALTER DATABASE name REFRESH COLLATION VERSION
2436 : */
2437 : ObjectAddress
2438 2 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2439 : {
2440 : Relation rel;
2441 : ScanKeyData scankey;
2442 : SysScanDesc scan;
2443 : Oid db_id;
2444 : HeapTuple tuple;
2445 : Form_pg_database datForm;
2446 : ObjectAddress address;
2447 : Datum datum;
2448 : bool isnull;
2449 : char *oldversion;
2450 : char *newversion;
2451 :
2452 2 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2453 2 : ScanKeyInit(&scankey,
2454 : Anum_pg_database_datname,
2455 : BTEqualStrategyNumber, F_NAMEEQ,
2456 2 : CStringGetDatum(stmt->dbname));
2457 2 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2458 : NULL, 1, &scankey);
2459 2 : tuple = systable_getnext(scan);
2460 2 : if (!HeapTupleIsValid(tuple))
2461 0 : ereport(ERROR,
2462 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2463 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2464 :
2465 2 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2466 2 : db_id = datForm->oid;
2467 :
2468 2 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2469 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2470 0 : stmt->dbname);
2471 :
2472 2 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2473 2 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2474 :
2475 2 : datum = heap_getattr(tuple, datForm->datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2476 2 : if (isnull)
2477 0 : elog(ERROR, "unexpected null in pg_database");
2478 2 : newversion = get_collation_actual_version(datForm->datlocprovider, TextDatumGetCString(datum));
2479 :
2480 : /* cannot change from NULL to non-NULL or vice versa */
2481 2 : if ((!oldversion && newversion) || (oldversion && !newversion))
2482 0 : elog(ERROR, "invalid collation version change");
2483 2 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2484 0 : {
2485 0 : bool nulls[Natts_pg_database] = {0};
2486 0 : bool replaces[Natts_pg_database] = {0};
2487 0 : Datum values[Natts_pg_database] = {0};
2488 :
2489 0 : ereport(NOTICE,
2490 : (errmsg("changing version from %s to %s",
2491 : oldversion, newversion)));
2492 :
2493 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2494 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2495 :
2496 0 : tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2497 : values, nulls, replaces);
2498 0 : CatalogTupleUpdate(rel, &tuple->t_self, tuple);
2499 0 : heap_freetuple(tuple);
2500 : }
2501 : else
2502 2 : ereport(NOTICE,
2503 : (errmsg("version has not changed")));
2504 :
2505 2 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2506 :
2507 2 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2508 :
2509 2 : systable_endscan(scan);
2510 :
2511 2 : table_close(rel, NoLock);
2512 :
2513 2 : return address;
2514 : }
2515 :
2516 :
2517 : /*
2518 : * ALTER DATABASE name SET ...
2519 : */
2520 : Oid
2521 1032 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2522 : {
2523 1032 : Oid datid = get_database_oid(stmt->dbname, false);
2524 :
2525 : /*
2526 : * Obtain a lock on the database and make sure it didn't go away in the
2527 : * meantime.
2528 : */
2529 1032 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2530 :
2531 1032 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2532 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2533 0 : stmt->dbname);
2534 :
2535 1032 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2536 :
2537 1032 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2538 :
2539 1032 : return datid;
2540 : }
2541 :
2542 :
2543 : /*
2544 : * ALTER DATABASE name OWNER TO newowner
2545 : */
2546 : ObjectAddress
2547 40 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2548 : {
2549 : Oid db_id;
2550 : HeapTuple tuple;
2551 : Relation rel;
2552 : ScanKeyData scankey;
2553 : SysScanDesc scan;
2554 : Form_pg_database datForm;
2555 : ObjectAddress address;
2556 :
2557 : /*
2558 : * Get the old tuple. We don't need a lock on the database per se,
2559 : * because we're not going to do anything that would mess up incoming
2560 : * connections.
2561 : */
2562 40 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2563 40 : ScanKeyInit(&scankey,
2564 : Anum_pg_database_datname,
2565 : BTEqualStrategyNumber, F_NAMEEQ,
2566 : CStringGetDatum(dbname));
2567 40 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2568 : NULL, 1, &scankey);
2569 40 : tuple = systable_getnext(scan);
2570 40 : if (!HeapTupleIsValid(tuple))
2571 0 : ereport(ERROR,
2572 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2573 : errmsg("database \"%s\" does not exist", dbname)));
2574 :
2575 40 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2576 40 : db_id = datForm->oid;
2577 :
2578 : /*
2579 : * If the new owner is the same as the existing owner, consider the
2580 : * command to have succeeded. This is to be consistent with other
2581 : * objects.
2582 : */
2583 40 : if (datForm->datdba != newOwnerId)
2584 : {
2585 : Datum repl_val[Natts_pg_database];
2586 24 : bool repl_null[Natts_pg_database] = {0};
2587 24 : bool repl_repl[Natts_pg_database] = {0};
2588 : Acl *newAcl;
2589 : Datum aclDatum;
2590 : bool isNull;
2591 : HeapTuple newtuple;
2592 :
2593 : /* Otherwise, must be owner of the existing object */
2594 24 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2595 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2596 : dbname);
2597 :
2598 : /* Must be able to become new owner */
2599 24 : check_can_set_role(GetUserId(), newOwnerId);
2600 :
2601 : /*
2602 : * must have createdb rights
2603 : *
2604 : * NOTE: This is different from other alter-owner checks in that the
2605 : * current user is checked for createdb privileges instead of the
2606 : * destination owner. This is consistent with the CREATE case for
2607 : * databases. Because superusers will always have this right, we need
2608 : * no special case for them.
2609 : */
2610 24 : if (!have_createdb_privilege())
2611 0 : ereport(ERROR,
2612 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2613 : errmsg("permission denied to change owner of database")));
2614 :
2615 24 : repl_repl[Anum_pg_database_datdba - 1] = true;
2616 24 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2617 :
2618 : /*
2619 : * Determine the modified ACL for the new owner. This is only
2620 : * necessary when the ACL is non-null.
2621 : */
2622 24 : aclDatum = heap_getattr(tuple,
2623 : Anum_pg_database_datacl,
2624 : RelationGetDescr(rel),
2625 : &isNull);
2626 24 : if (!isNull)
2627 : {
2628 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2629 : datForm->datdba, newOwnerId);
2630 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2631 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2632 : }
2633 :
2634 24 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2635 24 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2636 :
2637 24 : heap_freetuple(newtuple);
2638 :
2639 : /* Update owner dependency reference */
2640 24 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2641 : }
2642 :
2643 40 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2644 :
2645 40 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2646 :
2647 40 : systable_endscan(scan);
2648 :
2649 : /* Close pg_database, but keep lock till commit */
2650 40 : table_close(rel, NoLock);
2651 :
2652 40 : return address;
2653 : }
2654 :
2655 :
2656 : Datum
2657 60 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2658 : {
2659 60 : Oid dbid = PG_GETARG_OID(0);
2660 : HeapTuple tp;
2661 : char datlocprovider;
2662 : Datum datum;
2663 : char *version;
2664 :
2665 60 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2666 60 : if (!HeapTupleIsValid(tp))
2667 0 : ereport(ERROR,
2668 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2669 : errmsg("database with OID %u does not exist", dbid)));
2670 :
2671 60 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2672 :
2673 60 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate);
2674 60 : version = get_collation_actual_version(datlocprovider, TextDatumGetCString(datum));
2675 :
2676 60 : ReleaseSysCache(tp);
2677 :
2678 60 : if (version)
2679 28 : PG_RETURN_TEXT_P(cstring_to_text(version));
2680 : else
2681 32 : PG_RETURN_NULL();
2682 : }
2683 :
2684 :
2685 : /*
2686 : * Helper functions
2687 : */
2688 :
2689 : /*
2690 : * Look up info about the database named "name". If the database exists,
2691 : * obtain the specified lock type on it, fill in any of the remaining
2692 : * parameters that aren't NULL, and return true. If no such database,
2693 : * return false.
2694 : */
2695 : static bool
2696 650 : get_db_info(const char *name, LOCKMODE lockmode,
2697 : Oid *dbIdP, Oid *ownerIdP,
2698 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2699 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2700 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
2701 : char **dbIcurules,
2702 : char *dbLocProvider,
2703 : char **dbCollversion)
2704 : {
2705 650 : bool result = false;
2706 : Relation relation;
2707 :
2708 : Assert(name);
2709 :
2710 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2711 650 : relation = table_open(DatabaseRelationId, AccessShareLock);
2712 :
2713 : /*
2714 : * Loop covers the rare case where the database is renamed before we can
2715 : * lock it. We try again just in case we can find a new one of the same
2716 : * name.
2717 : */
2718 : for (;;)
2719 0 : {
2720 : ScanKeyData scanKey;
2721 : SysScanDesc scan;
2722 : HeapTuple tuple;
2723 : Oid dbOid;
2724 :
2725 : /*
2726 : * there's no syscache for database-indexed-by-name, so must do it the
2727 : * hard way
2728 : */
2729 650 : ScanKeyInit(&scanKey,
2730 : Anum_pg_database_datname,
2731 : BTEqualStrategyNumber, F_NAMEEQ,
2732 : CStringGetDatum(name));
2733 :
2734 650 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2735 : NULL, 1, &scanKey);
2736 :
2737 650 : tuple = systable_getnext(scan);
2738 :
2739 650 : if (!HeapTupleIsValid(tuple))
2740 : {
2741 : /* definitely no database of that name */
2742 32 : systable_endscan(scan);
2743 32 : break;
2744 : }
2745 :
2746 618 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2747 :
2748 618 : systable_endscan(scan);
2749 :
2750 : /*
2751 : * Now that we have a database OID, we can try to lock the DB.
2752 : */
2753 618 : if (lockmode != NoLock)
2754 618 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2755 :
2756 : /*
2757 : * And now, re-fetch the tuple by OID. If it's still there and still
2758 : * the same name, we win; else, drop the lock and loop back to try
2759 : * again.
2760 : */
2761 618 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2762 618 : if (HeapTupleIsValid(tuple))
2763 : {
2764 618 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2765 :
2766 618 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2767 : {
2768 : Datum datum;
2769 : bool isnull;
2770 :
2771 : /* oid of the database */
2772 618 : if (dbIdP)
2773 618 : *dbIdP = dbOid;
2774 : /* oid of the owner */
2775 618 : if (ownerIdP)
2776 554 : *ownerIdP = dbform->datdba;
2777 : /* character encoding */
2778 618 : if (encodingP)
2779 554 : *encodingP = dbform->encoding;
2780 : /* allowed as template? */
2781 618 : if (dbIsTemplateP)
2782 608 : *dbIsTemplateP = dbform->datistemplate;
2783 : /* Has on login event trigger? */
2784 618 : if (dbHasLoginEvtP)
2785 554 : *dbHasLoginEvtP = dbform->dathasloginevt;
2786 : /* allowing connections? */
2787 618 : if (dbAllowConnP)
2788 554 : *dbAllowConnP = dbform->datallowconn;
2789 : /* limit of frozen XIDs */
2790 618 : if (dbFrozenXidP)
2791 554 : *dbFrozenXidP = dbform->datfrozenxid;
2792 : /* minimum MultiXactId */
2793 618 : if (dbMinMultiP)
2794 554 : *dbMinMultiP = dbform->datminmxid;
2795 : /* default tablespace for this database */
2796 618 : if (dbTablespace)
2797 564 : *dbTablespace = dbform->dattablespace;
2798 : /* default locale settings for this database */
2799 618 : if (dbLocProvider)
2800 554 : *dbLocProvider = dbform->datlocprovider;
2801 618 : if (dbCollate)
2802 : {
2803 554 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2804 554 : *dbCollate = TextDatumGetCString(datum);
2805 : }
2806 618 : if (dbCtype)
2807 : {
2808 554 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2809 554 : *dbCtype = TextDatumGetCString(datum);
2810 : }
2811 618 : if (dbIculocale)
2812 : {
2813 554 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticulocale, &isnull);
2814 554 : if (isnull)
2815 510 : *dbIculocale = NULL;
2816 : else
2817 44 : *dbIculocale = TextDatumGetCString(datum);
2818 : }
2819 618 : if (dbIcurules)
2820 : {
2821 554 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2822 554 : if (isnull)
2823 554 : *dbIcurules = NULL;
2824 : else
2825 0 : *dbIcurules = TextDatumGetCString(datum);
2826 : }
2827 618 : if (dbCollversion)
2828 : {
2829 554 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2830 554 : if (isnull)
2831 502 : *dbCollversion = NULL;
2832 : else
2833 52 : *dbCollversion = TextDatumGetCString(datum);
2834 : }
2835 618 : ReleaseSysCache(tuple);
2836 618 : result = true;
2837 618 : break;
2838 : }
2839 : /* can only get here if it was just renamed */
2840 0 : ReleaseSysCache(tuple);
2841 : }
2842 :
2843 0 : if (lockmode != NoLock)
2844 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2845 : }
2846 :
2847 650 : table_close(relation, AccessShareLock);
2848 :
2849 650 : return result;
2850 : }
2851 :
2852 : /* Check if current user has createdb privileges */
2853 : bool
2854 614 : have_createdb_privilege(void)
2855 : {
2856 614 : bool result = false;
2857 : HeapTuple utup;
2858 :
2859 : /* Superusers can always do everything */
2860 614 : if (superuser())
2861 578 : return true;
2862 :
2863 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2864 36 : if (HeapTupleIsValid(utup))
2865 : {
2866 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2867 36 : ReleaseSysCache(utup);
2868 : }
2869 36 : return result;
2870 : }
2871 :
2872 : /*
2873 : * Remove tablespace directories
2874 : *
2875 : * We don't know what tablespaces db_id is using, so iterate through all
2876 : * tablespaces removing <tablespace>/db_id
2877 : */
2878 : static void
2879 52 : remove_dbtablespaces(Oid db_id)
2880 : {
2881 : Relation rel;
2882 : TableScanDesc scan;
2883 : HeapTuple tuple;
2884 52 : List *ltblspc = NIL;
2885 : ListCell *cell;
2886 : int ntblspc;
2887 : int i;
2888 : Oid *tablespace_ids;
2889 :
2890 52 : rel = table_open(TableSpaceRelationId, AccessShareLock);
2891 52 : scan = table_beginscan_catalog(rel, 0, NULL);
2892 198 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2893 : {
2894 146 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2895 146 : Oid dsttablespace = spcform->oid;
2896 : char *dstpath;
2897 : struct stat st;
2898 :
2899 : /* Don't mess with the global tablespace */
2900 146 : if (dsttablespace == GLOBALTABLESPACE_OID)
2901 94 : continue;
2902 :
2903 94 : dstpath = GetDatabasePath(db_id, dsttablespace);
2904 :
2905 94 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
2906 : {
2907 : /* Assume we can ignore it */
2908 42 : pfree(dstpath);
2909 42 : continue;
2910 : }
2911 :
2912 52 : if (!rmtree(dstpath, true))
2913 0 : ereport(WARNING,
2914 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2915 : dstpath)));
2916 :
2917 52 : ltblspc = lappend_oid(ltblspc, dsttablespace);
2918 52 : pfree(dstpath);
2919 : }
2920 :
2921 52 : ntblspc = list_length(ltblspc);
2922 52 : if (ntblspc == 0)
2923 : {
2924 0 : table_endscan(scan);
2925 0 : table_close(rel, AccessShareLock);
2926 0 : return;
2927 : }
2928 :
2929 52 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
2930 52 : i = 0;
2931 104 : foreach(cell, ltblspc)
2932 52 : tablespace_ids[i++] = lfirst_oid(cell);
2933 :
2934 : /* Record the filesystem change in XLOG */
2935 : {
2936 : xl_dbase_drop_rec xlrec;
2937 :
2938 52 : xlrec.db_id = db_id;
2939 52 : xlrec.ntablespaces = ntblspc;
2940 :
2941 52 : XLogBeginInsert();
2942 52 : XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
2943 52 : XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
2944 :
2945 52 : (void) XLogInsert(RM_DBASE_ID,
2946 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2947 : }
2948 :
2949 52 : list_free(ltblspc);
2950 52 : pfree(tablespace_ids);
2951 :
2952 52 : table_endscan(scan);
2953 52 : table_close(rel, AccessShareLock);
2954 : }
2955 :
2956 : /*
2957 : * Check for existing files that conflict with a proposed new DB OID;
2958 : * return true if there are any
2959 : *
2960 : * If there were a subdirectory in any tablespace matching the proposed new
2961 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
2962 : * try to remove that already-existing subdirectory during the cleanup in
2963 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
2964 : * instead we make this extra check before settling on the OID of the new
2965 : * database. This exactly parallels what GetNewRelFileNumber() does for table
2966 : * relfilenumber values.
2967 : */
2968 : static bool
2969 538 : check_db_file_conflict(Oid db_id)
2970 : {
2971 538 : bool result = false;
2972 : Relation rel;
2973 : TableScanDesc scan;
2974 : HeapTuple tuple;
2975 :
2976 538 : rel = table_open(TableSpaceRelationId, AccessShareLock);
2977 538 : scan = table_beginscan_catalog(rel, 0, NULL);
2978 1704 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2979 : {
2980 1166 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2981 1166 : Oid dsttablespace = spcform->oid;
2982 : char *dstpath;
2983 : struct stat st;
2984 :
2985 : /* Don't mess with the global tablespace */
2986 1166 : if (dsttablespace == GLOBALTABLESPACE_OID)
2987 538 : continue;
2988 :
2989 628 : dstpath = GetDatabasePath(db_id, dsttablespace);
2990 :
2991 628 : if (lstat(dstpath, &st) == 0)
2992 : {
2993 : /* Found a conflicting file (or directory, whatever) */
2994 0 : pfree(dstpath);
2995 0 : result = true;
2996 0 : break;
2997 : }
2998 :
2999 628 : pfree(dstpath);
3000 : }
3001 :
3002 538 : table_endscan(scan);
3003 538 : table_close(rel, AccessShareLock);
3004 :
3005 538 : return result;
3006 : }
3007 :
3008 : /*
3009 : * Issue a suitable errdetail message for a busy database
3010 : */
3011 : static int
3012 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3013 : {
3014 0 : if (notherbackends > 0 && npreparedxacts > 0)
3015 :
3016 : /*
3017 : * We don't deal with singular versus plural here, since gettext
3018 : * doesn't support multiple plurals in one string.
3019 : */
3020 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3021 : notherbackends, npreparedxacts);
3022 0 : else if (notherbackends > 0)
3023 0 : errdetail_plural("There is %d other session using the database.",
3024 : "There are %d other sessions using the database.",
3025 : notherbackends,
3026 : notherbackends);
3027 : else
3028 0 : errdetail_plural("There is %d prepared transaction using the database.",
3029 : "There are %d prepared transactions using the database.",
3030 : npreparedxacts,
3031 : npreparedxacts);
3032 0 : return 0; /* just to keep ereport macro happy */
3033 : }
3034 :
3035 : /*
3036 : * get_database_oid - given a database name, look up the OID
3037 : *
3038 : * If missing_ok is false, throw an error if database name not found. If
3039 : * true, just return InvalidOid.
3040 : */
3041 : Oid
3042 2094 : get_database_oid(const char *dbname, bool missing_ok)
3043 : {
3044 : Relation pg_database;
3045 : ScanKeyData entry[1];
3046 : SysScanDesc scan;
3047 : HeapTuple dbtuple;
3048 : Oid oid;
3049 :
3050 : /*
3051 : * There's no syscache for pg_database indexed by name, so we must look
3052 : * the hard way.
3053 : */
3054 2094 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3055 2094 : ScanKeyInit(&entry[0],
3056 : Anum_pg_database_datname,
3057 : BTEqualStrategyNumber, F_NAMEEQ,
3058 : CStringGetDatum(dbname));
3059 2094 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3060 : NULL, 1, entry);
3061 :
3062 2094 : dbtuple = systable_getnext(scan);
3063 :
3064 : /* We assume that there can be at most one matching tuple */
3065 2094 : if (HeapTupleIsValid(dbtuple))
3066 1550 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3067 : else
3068 544 : oid = InvalidOid;
3069 :
3070 2094 : systable_endscan(scan);
3071 2094 : table_close(pg_database, AccessShareLock);
3072 :
3073 2094 : if (!OidIsValid(oid) && !missing_ok)
3074 6 : ereport(ERROR,
3075 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3076 : errmsg("database \"%s\" does not exist",
3077 : dbname)));
3078 :
3079 2088 : return oid;
3080 : }
3081 :
3082 :
3083 : /*
3084 : * get_database_name - given a database OID, look up the name
3085 : *
3086 : * Returns a palloc'd string, or NULL if no such database.
3087 : */
3088 : char *
3089 22496 : get_database_name(Oid dbid)
3090 : {
3091 : HeapTuple dbtuple;
3092 : char *result;
3093 :
3094 22496 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3095 22496 : if (HeapTupleIsValid(dbtuple))
3096 : {
3097 22336 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3098 22336 : ReleaseSysCache(dbtuple);
3099 : }
3100 : else
3101 160 : result = NULL;
3102 :
3103 22496 : return result;
3104 : }
3105 :
3106 :
3107 : /*
3108 : * While dropping a database the pg_database row is marked invalid, but the
3109 : * catalog contents still exist. Connections to such a database are not
3110 : * allowed.
3111 : */
3112 : bool
3113 25226 : database_is_invalid_form(Form_pg_database datform)
3114 : {
3115 25226 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3116 : }
3117 :
3118 :
3119 : /*
3120 : * Convenience wrapper around database_is_invalid_form()
3121 : */
3122 : bool
3123 554 : database_is_invalid_oid(Oid dboid)
3124 : {
3125 : HeapTuple dbtup;
3126 : Form_pg_database dbform;
3127 : bool invalid;
3128 :
3129 554 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3130 554 : if (!HeapTupleIsValid(dbtup))
3131 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3132 554 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3133 :
3134 554 : invalid = database_is_invalid_form(dbform);
3135 :
3136 554 : ReleaseSysCache(dbtup);
3137 :
3138 554 : return invalid;
3139 : }
3140 :
3141 :
3142 : /*
3143 : * recovery_create_dbdir()
3144 : *
3145 : * During recovery, there's a case where we validly need to recover a missing
3146 : * tablespace directory so that recovery can continue. This happens when
3147 : * recovery wants to create a database but the holding tablespace has been
3148 : * removed before the server stopped. Since we expect that the directory will
3149 : * be gone before reaching recovery consistency, and we have no knowledge about
3150 : * the tablespace other than its OID here, we create a real directory under
3151 : * pg_tblspc here instead of restoring the symlink.
3152 : *
3153 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3154 : */
3155 : static void
3156 34 : recovery_create_dbdir(char *path, bool only_tblspc)
3157 : {
3158 : struct stat st;
3159 :
3160 : Assert(RecoveryInProgress());
3161 :
3162 34 : if (stat(path, &st) == 0)
3163 34 : return;
3164 :
3165 0 : if (only_tblspc && strstr(path, "pg_tblspc/") == NULL)
3166 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3167 :
3168 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3169 0 : ereport(PANIC,
3170 : errmsg("missing directory \"%s\"", path));
3171 :
3172 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3173 : "creating missing directory: %s", path);
3174 :
3175 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3176 0 : ereport(PANIC,
3177 : errmsg("could not create missing directory \"%s\": %m", path));
3178 : }
3179 :
3180 :
3181 : /*
3182 : * DATABASE resource manager's routines
3183 : */
3184 : void
3185 58 : dbase_redo(XLogReaderState *record)
3186 : {
3187 58 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3188 :
3189 : /* Backup blocks are not used in dbase records */
3190 : Assert(!XLogRecHasAnyBlockRefs(record));
3191 :
3192 58 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3193 : {
3194 6 : xl_dbase_create_file_copy_rec *xlrec =
3195 6 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3196 : char *src_path;
3197 : char *dst_path;
3198 : char *parent_path;
3199 : struct stat st;
3200 :
3201 6 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3202 6 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3203 :
3204 : /*
3205 : * Our theory for replaying a CREATE is to forcibly drop the target
3206 : * subdirectory if present, then re-copy the source data. This may be
3207 : * more work than needed, but it is simple to implement.
3208 : */
3209 6 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3210 : {
3211 0 : if (!rmtree(dst_path, true))
3212 : /* If this failed, copydir() below is going to error. */
3213 0 : ereport(WARNING,
3214 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3215 : dst_path)));
3216 : }
3217 :
3218 : /*
3219 : * If the parent of the target path doesn't exist, create it now. This
3220 : * enables us to create the target underneath later.
3221 : */
3222 6 : parent_path = pstrdup(dst_path);
3223 6 : get_parent_directory(parent_path);
3224 6 : if (stat(parent_path, &st) < 0)
3225 : {
3226 0 : if (errno != ENOENT)
3227 0 : ereport(FATAL,
3228 : errmsg("could not stat directory \"%s\": %m",
3229 : dst_path));
3230 :
3231 : /* create the parent directory if needed and valid */
3232 0 : recovery_create_dbdir(parent_path, true);
3233 : }
3234 6 : pfree(parent_path);
3235 :
3236 : /*
3237 : * There's a case where the copy source directory is missing for the
3238 : * same reason above. Create the empty source directory so that
3239 : * copydir below doesn't fail. The directory will be dropped soon by
3240 : * recovery.
3241 : */
3242 6 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3243 0 : recovery_create_dbdir(src_path, false);
3244 :
3245 : /*
3246 : * Force dirty buffers out to disk, to ensure source database is
3247 : * up-to-date for the copy.
3248 : */
3249 6 : FlushDatabaseBuffers(xlrec->src_db_id);
3250 :
3251 : /* Close all sgmr fds in all backends. */
3252 6 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3253 :
3254 : /*
3255 : * Copy this subdirectory to the new location
3256 : *
3257 : * We don't need to copy subdirectories
3258 : */
3259 6 : copydir(src_path, dst_path, false);
3260 :
3261 6 : pfree(src_path);
3262 6 : pfree(dst_path);
3263 : }
3264 52 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3265 : {
3266 34 : xl_dbase_create_wal_log_rec *xlrec =
3267 34 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3268 : char *dbpath;
3269 : char *parent_path;
3270 :
3271 34 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3272 :
3273 : /* create the parent directory if needed and valid */
3274 34 : parent_path = pstrdup(dbpath);
3275 34 : get_parent_directory(parent_path);
3276 34 : recovery_create_dbdir(parent_path, true);
3277 :
3278 : /* Create the database directory with the version file. */
3279 34 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3280 : true);
3281 34 : pfree(dbpath);
3282 : }
3283 18 : else if (info == XLOG_DBASE_DROP)
3284 : {
3285 18 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3286 : char *dst_path;
3287 : int i;
3288 :
3289 18 : if (InHotStandby)
3290 : {
3291 : /*
3292 : * Lock database while we resolve conflicts to ensure that
3293 : * InitPostgres() cannot fully re-execute concurrently. This
3294 : * avoids backends re-connecting automatically to same database,
3295 : * which can happen in some cases.
3296 : *
3297 : * This will lock out walsenders trying to connect to db-specific
3298 : * slots for logical decoding too, so it's safe for us to drop
3299 : * slots.
3300 : */
3301 18 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3302 18 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3303 : }
3304 :
3305 : /* Drop any database-specific replication slots */
3306 18 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3307 :
3308 : /* Drop pages for this database that are in the shared buffer cache */
3309 18 : DropDatabaseBuffers(xlrec->db_id);
3310 :
3311 : /* Also, clean out any fsync requests that might be pending in md.c */
3312 18 : ForgetDatabaseSyncRequests(xlrec->db_id);
3313 :
3314 : /* Clean out the xlog relcache too */
3315 18 : XLogDropDatabase(xlrec->db_id);
3316 :
3317 : /* Close all sgmr fds in all backends. */
3318 18 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3319 :
3320 36 : for (i = 0; i < xlrec->ntablespaces; i++)
3321 : {
3322 18 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3323 :
3324 : /* And remove the physical files */
3325 18 : if (!rmtree(dst_path, true))
3326 0 : ereport(WARNING,
3327 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3328 : dst_path)));
3329 18 : pfree(dst_path);
3330 : }
3331 :
3332 18 : if (InHotStandby)
3333 : {
3334 : /*
3335 : * Release locks prior to commit. XXX There is a race condition
3336 : * here that may allow backends to reconnect, but the window for
3337 : * this is small because the gap between here and commit is mostly
3338 : * fairly small and it is unlikely that people will be dropping
3339 : * databases that we are trying to connect to anyway.
3340 : */
3341 18 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3342 : }
3343 : }
3344 : else
3345 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3346 58 : }
|