Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/pg_locale.h"
68 : #include "utils/relmapper.h"
69 : #include "utils/snapmgr.h"
70 : #include "utils/syscache.h"
71 :
72 : /*
73 : * Create database strategy.
74 : *
75 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76 : * copied block.
77 : *
78 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79 : * database and log a single record for each tablespace copied. To make this
80 : * safe, it also triggers checkpoints before and after the operation.
81 : */
82 : typedef enum CreateDBStrategy
83 : {
84 : CREATEDB_WAL_LOG,
85 : CREATEDB_FILE_COPY,
86 : } CreateDBStrategy;
87 :
88 : typedef struct
89 : {
90 : Oid src_dboid; /* source (template) DB */
91 : Oid dest_dboid; /* DB we are trying to create */
92 : CreateDBStrategy strategy; /* create db strategy */
93 : } createdb_failure_params;
94 :
95 : typedef struct
96 : {
97 : Oid dest_dboid; /* DB we are trying to move */
98 : Oid dest_tsoid; /* tablespace we are trying to move to */
99 : } movedb_failure_params;
100 :
101 : /*
102 : * Information about a relation to be copied when creating a database.
103 : */
104 : typedef struct CreateDBRelInfo
105 : {
106 : RelFileLocator rlocator; /* physical relation identifier */
107 : Oid reloid; /* relation oid */
108 : bool permanent; /* relation is permanent or unlogged */
109 : } CreateDBRelInfo;
110 :
111 :
112 : /* non-export function prototypes */
113 : static void createdb_failure_callback(int code, Datum arg);
114 : static void movedb(const char *dbname, const char *tblspcname);
115 : static void movedb_failure_callback(int code, Datum arg);
116 : static bool get_db_info(const char *name, LOCKMODE lockmode,
117 : Oid *dbIdP, Oid *ownerIdP,
118 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121 : char **dbIcurules,
122 : char *dbLocProvider,
123 : char **dbCollversion);
124 : static void remove_dbtablespaces(Oid db_id);
125 : static bool check_db_file_conflict(Oid db_id);
126 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
127 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128 : Oid dst_tsid);
129 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
130 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
131 : Oid dbid, char *srcpath,
132 : List *rlocatorlist, Snapshot snapshot);
133 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
134 : Oid tbid, Oid dbid,
135 : char *srcpath);
136 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137 : bool isRedo);
138 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139 : Oid src_tsid, Oid dst_tsid);
140 : static void recovery_create_dbdir(char *path, bool only_tblspc);
141 :
142 : /*
143 : * Create a new database using the WAL_LOG strategy.
144 : *
145 : * Each copied block is separately written to the write-ahead log.
146 : */
147 : static void
148 462 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149 : Oid src_tsid, Oid dst_tsid)
150 : {
151 : char *srcpath;
152 : char *dstpath;
153 462 : List *rlocatorlist = NULL;
154 : ListCell *cell;
155 : LockRelId srcrelid;
156 : LockRelId dstrelid;
157 : RelFileLocator srcrlocator;
158 : RelFileLocator dstrlocator;
159 : CreateDBRelInfo *relinfo;
160 :
161 : /* Get source and destination database paths. */
162 462 : srcpath = GetDatabasePath(src_dboid, src_tsid);
163 462 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164 :
165 : /* Create database directory and write PG_VERSION file. */
166 462 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167 :
168 : /* Copy relmap file from source database to the destination database. */
169 462 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170 :
171 : /* Get list of relfilelocators to copy from the source database. */
172 462 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173 : Assert(rlocatorlist != NIL);
174 :
175 : /*
176 : * Database IDs will be the same for all relations so set them before
177 : * entering the loop.
178 : */
179 462 : srcrelid.dbId = src_dboid;
180 462 : dstrelid.dbId = dst_dboid;
181 :
182 : /* Loop over our list of relfilelocators and copy each one. */
183 103982 : foreach(cell, rlocatorlist)
184 : {
185 103520 : relinfo = lfirst(cell);
186 103520 : srcrlocator = relinfo->rlocator;
187 :
188 : /*
189 : * If the relation is from the source db's default tablespace then we
190 : * need to create it in the destination db's default tablespace.
191 : * Otherwise, we need to create in the same tablespace as it is in the
192 : * source database.
193 : */
194 103520 : if (srcrlocator.spcOid == src_tsid)
195 103520 : dstrlocator.spcOid = dst_tsid;
196 : else
197 0 : dstrlocator.spcOid = srcrlocator.spcOid;
198 :
199 103520 : dstrlocator.dbOid = dst_dboid;
200 103520 : dstrlocator.relNumber = srcrlocator.relNumber;
201 :
202 : /*
203 : * Acquire locks on source and target relations before copying.
204 : *
205 : * We typically do not read relation data into shared_buffers without
206 : * holding a relation lock. It's unclear what could go wrong if we
207 : * skipped it in this case, because nobody can be modifying either the
208 : * source or destination database at this point, and we have locks on
209 : * both databases, too, but let's take the conservative route.
210 : */
211 103520 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
212 103520 : LockRelationId(&srcrelid, AccessShareLock);
213 103520 : LockRelationId(&dstrelid, AccessShareLock);
214 :
215 : /* Copy relation storage from source to the destination. */
216 103520 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217 :
218 : /* Release the relation locks. */
219 103520 : UnlockRelationId(&srcrelid, AccessShareLock);
220 103520 : UnlockRelationId(&dstrelid, AccessShareLock);
221 : }
222 :
223 462 : pfree(srcpath);
224 462 : pfree(dstpath);
225 462 : list_free_deep(rlocatorlist);
226 462 : }
227 :
228 : /*
229 : * Scan the pg_class table in the source database to identify the relations
230 : * that need to be copied to the destination database.
231 : *
232 : * This is an exception to the usual rule that cross-database access is
233 : * not possible. We can make it work here because we know that there are no
234 : * connections to the source database and (since there can't be prepared
235 : * transactions touching that database) no in-doubt tuples either. This
236 : * means that we don't need to worry about pruning removing anything from
237 : * under us, and we don't need to be too picky about our snapshot either.
238 : * As long as it sees all previously-committed XIDs as committed and all
239 : * aborted XIDs as aborted, we should be fine: nothing else is possible
240 : * here.
241 : *
242 : * We can't rely on the relcache for anything here, because that only knows
243 : * about the database to which we are connected, and can't handle access to
244 : * other databases. That also means we can't rely on the heap scan
245 : * infrastructure, which would be a bad idea anyway since it might try
246 : * to do things like HOT pruning which we definitely can't do safely in
247 : * a database to which we're not even connected.
248 : */
249 : static List *
250 462 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251 : {
252 : RelFileLocator rlocator;
253 : BlockNumber nblocks;
254 : BlockNumber blkno;
255 : Buffer buf;
256 : RelFileNumber relfilenumber;
257 : Page page;
258 462 : List *rlocatorlist = NIL;
259 : LockRelId relid;
260 : Snapshot snapshot;
261 : SMgrRelation smgr;
262 : BufferAccessStrategy bstrategy;
263 :
264 : /* Get pg_class relfilenumber. */
265 462 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266 : RelationRelationId);
267 :
268 : /* Don't read data into shared_buffers without holding a relation lock. */
269 462 : relid.dbId = dbid;
270 462 : relid.relId = RelationRelationId;
271 462 : LockRelationId(&relid, AccessShareLock);
272 :
273 : /* Prepare a RelFileLocator for the pg_class relation. */
274 462 : rlocator.spcOid = tbid;
275 462 : rlocator.dbOid = dbid;
276 462 : rlocator.relNumber = relfilenumber;
277 :
278 462 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279 462 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280 462 : smgrclose(smgr);
281 :
282 : /* Use a buffer access strategy since this is a bulk read operation. */
283 462 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
284 :
285 : /*
286 : * As explained in the function header comments, we need a snapshot that
287 : * will see all committed transactions as committed, and our transaction
288 : * snapshot - or the active snapshot - might not be new enough for that,
289 : * but the return value of GetLatestSnapshot() should work fine.
290 : */
291 462 : snapshot = RegisterSnapshot(GetLatestSnapshot());
292 :
293 : /* Process the relation block by block. */
294 6930 : for (blkno = 0; blkno < nblocks; blkno++)
295 : {
296 6468 : CHECK_FOR_INTERRUPTS();
297 :
298 6468 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299 : RBM_NORMAL, bstrategy, true);
300 :
301 6468 : LockBuffer(buf, BUFFER_LOCK_SHARE);
302 6468 : page = BufferGetPage(buf);
303 6468 : if (PageIsNew(page) || PageIsEmpty(page))
304 : {
305 0 : UnlockReleaseBuffer(buf);
306 0 : continue;
307 : }
308 :
309 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
310 6468 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311 : srcpath, rlocatorlist,
312 : snapshot);
313 :
314 6468 : UnlockReleaseBuffer(buf);
315 : }
316 462 : UnregisterSnapshot(snapshot);
317 :
318 : /* Release relation lock. */
319 462 : UnlockRelationId(&relid, AccessShareLock);
320 :
321 462 : return rlocatorlist;
322 : }
323 :
324 : /*
325 : * Scan one page of the source database's pg_class relation and add relevant
326 : * entries to rlocatorlist. The return value is the updated list.
327 : */
328 : static List *
329 6468 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
330 : char *srcpath, List *rlocatorlist,
331 : Snapshot snapshot)
332 : {
333 6468 : BlockNumber blkno = BufferGetBlockNumber(buf);
334 : OffsetNumber offnum;
335 : OffsetNumber maxoff;
336 : HeapTupleData tuple;
337 :
338 6468 : maxoff = PageGetMaxOffsetNumber(page);
339 :
340 : /* Loop over offsets. */
341 330518 : for (offnum = FirstOffsetNumber;
342 : offnum <= maxoff;
343 324050 : offnum = OffsetNumberNext(offnum))
344 : {
345 : ItemId itemid;
346 :
347 324050 : itemid = PageGetItemId(page, offnum);
348 :
349 : /* Nothing to do if slot is empty or already dead. */
350 324050 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
351 231524 : ItemIdIsRedirected(itemid))
352 132258 : continue;
353 :
354 : Assert(ItemIdIsNormal(itemid));
355 191792 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
356 :
357 : /* Initialize a HeapTupleData structure. */
358 191792 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
359 191792 : tuple.t_len = ItemIdGetLength(itemid);
360 191792 : tuple.t_tableOid = RelationRelationId;
361 :
362 : /* Skip tuples that are not visible to this snapshot. */
363 191792 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
364 : {
365 : CreateDBRelInfo *relinfo;
366 :
367 : /*
368 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
369 : * CreateDBRelInfo object for this tuple, but can also decide that
370 : * this tuple isn't something we need to copy. If we do need to
371 : * copy the relation, add it to the list.
372 : */
373 191762 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
374 : srcpath);
375 191762 : if (relinfo != NULL)
376 103520 : rlocatorlist = lappend(rlocatorlist, relinfo);
377 : }
378 : }
379 :
380 6468 : return rlocatorlist;
381 : }
382 :
383 : /*
384 : * Decide whether a certain pg_class tuple represents something that
385 : * needs to be copied from the source database to the destination database,
386 : * and if so, construct a CreateDBRelInfo for it.
387 : *
388 : * Visibility checks are handled by the caller, so our job here is just
389 : * to assess the data stored in the tuple.
390 : */
391 : CreateDBRelInfo *
392 191762 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
393 : char *srcpath)
394 : {
395 : CreateDBRelInfo *relinfo;
396 : Form_pg_class classForm;
397 191762 : RelFileNumber relfilenumber = InvalidRelFileNumber;
398 :
399 191762 : classForm = (Form_pg_class) GETSTRUCT(tuple);
400 :
401 : /*
402 : * Return NULL if this object does not need to be copied.
403 : *
404 : * Shared objects don't need to be copied, because they are shared.
405 : * Objects without storage can't be copied, because there's nothing to
406 : * copy. Temporary relations don't need to be copied either, because they
407 : * are inaccessible outside of the session that created them, which must
408 : * be gone already, and couldn't connect to a different database if it
409 : * still existed. autovacuum will eventually remove the pg_class entries
410 : * as well.
411 : */
412 191762 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
413 169586 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
414 103520 : classForm->relpersistence == RELPERSISTENCE_TEMP)
415 88242 : return NULL;
416 :
417 : /*
418 : * If relfilenumber is valid then directly use it. Otherwise, consult the
419 : * relmap.
420 : */
421 103520 : if (RelFileNumberIsValid(classForm->relfilenode))
422 95666 : relfilenumber = classForm->relfilenode;
423 : else
424 7854 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
425 : classForm->oid);
426 :
427 : /* We must have a valid relfilenumber. */
428 103520 : if (!RelFileNumberIsValid(relfilenumber))
429 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
430 : classForm->oid);
431 :
432 : /* Prepare a rel info element and add it to the list. */
433 103520 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
434 103520 : if (OidIsValid(classForm->reltablespace))
435 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
436 : else
437 103520 : relinfo->rlocator.spcOid = tbid;
438 :
439 103520 : relinfo->rlocator.dbOid = dbid;
440 103520 : relinfo->rlocator.relNumber = relfilenumber;
441 103520 : relinfo->reloid = classForm->oid;
442 :
443 : /* Temporary relations were rejected above. */
444 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
445 103520 : relinfo->permanent =
446 103520 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
447 :
448 103520 : return relinfo;
449 : }
450 :
451 : /*
452 : * Create database directory and write out the PG_VERSION file in the database
453 : * path. If isRedo is true, it's okay for the database directory to exist
454 : * already.
455 : */
456 : static void
457 506 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
458 : {
459 : int fd;
460 : int nbytes;
461 : char versionfile[MAXPGPATH];
462 : char buf[16];
463 :
464 : /*
465 : * Note that we don't have to copy version data from the source database;
466 : * there's only one legal value.
467 : */
468 506 : sprintf(buf, "%s\n", PG_MAJORVERSION);
469 506 : nbytes = strlen(PG_MAJORVERSION) + 1;
470 :
471 : /* Create database directory. */
472 506 : if (MakePGDirectory(dbpath) < 0)
473 : {
474 : /* Failure other than already exists or not in WAL replay? */
475 16 : if (errno != EEXIST || !isRedo)
476 0 : ereport(ERROR,
477 : (errcode_for_file_access(),
478 : errmsg("could not create directory \"%s\": %m", dbpath)));
479 : }
480 :
481 : /*
482 : * Create PG_VERSION file in the database path. If the file already
483 : * exists and we are in WAL replay then try again to open it in write
484 : * mode.
485 : */
486 506 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
487 :
488 506 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
489 506 : if (fd < 0 && errno == EEXIST && isRedo)
490 16 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
491 :
492 506 : if (fd < 0)
493 0 : ereport(ERROR,
494 : (errcode_for_file_access(),
495 : errmsg("could not create file \"%s\": %m", versionfile)));
496 :
497 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
498 506 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
499 506 : errno = 0;
500 506 : if ((int) write(fd, buf, nbytes) != nbytes)
501 : {
502 : /* If write didn't set errno, assume problem is no disk space. */
503 0 : if (errno == 0)
504 0 : errno = ENOSPC;
505 0 : ereport(ERROR,
506 : (errcode_for_file_access(),
507 : errmsg("could not write to file \"%s\": %m", versionfile)));
508 : }
509 506 : pgstat_report_wait_end();
510 :
511 506 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
512 506 : if (pg_fsync(fd) != 0)
513 0 : ereport(data_sync_elevel(ERROR),
514 : (errcode_for_file_access(),
515 : errmsg("could not fsync file \"%s\": %m", versionfile)));
516 506 : fsync_fname(dbpath, true);
517 506 : pgstat_report_wait_end();
518 :
519 : /* Close the version file. */
520 506 : CloseTransientFile(fd);
521 :
522 : /* If we are not in WAL replay then write the WAL. */
523 506 : if (!isRedo)
524 : {
525 : xl_dbase_create_wal_log_rec xlrec;
526 :
527 462 : START_CRIT_SECTION();
528 :
529 462 : xlrec.db_id = dbid;
530 462 : xlrec.tablespace_id = tsid;
531 :
532 462 : XLogBeginInsert();
533 462 : XLogRegisterData(&xlrec,
534 : sizeof(xl_dbase_create_wal_log_rec));
535 :
536 462 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
537 :
538 462 : END_CRIT_SECTION();
539 : }
540 506 : }
541 :
542 : /*
543 : * Create a new database using the FILE_COPY strategy.
544 : *
545 : * Copy each tablespace at the filesystem level, and log a single WAL record
546 : * for each tablespace copied. This requires a checkpoint before and after the
547 : * copy, which may be expensive, but it does greatly reduce WAL generation
548 : * if the copied database is large.
549 : */
550 : static void
551 240 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
552 : Oid dst_tsid)
553 : {
554 : TableScanDesc scan;
555 : Relation rel;
556 : HeapTuple tuple;
557 :
558 : /*
559 : * Force a checkpoint before starting the copy. This will force all dirty
560 : * buffers, including those of unlogged tables, out to disk, to ensure
561 : * source database is up-to-date on disk for the copy.
562 : * FlushDatabaseBuffers() would suffice for that, but we also want to
563 : * process any pending unlink requests. Otherwise, if a checkpoint
564 : * happened while we're copying files, a file might be deleted just when
565 : * we're about to copy it, causing the lstat() call in copydir() to fail
566 : * with ENOENT.
567 : *
568 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
569 : * is careful to ensure that template0 is fully written to disk prior to
570 : * any CREATE DATABASE commands.
571 : */
572 240 : if (!IsBinaryUpgrade)
573 192 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
574 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
575 :
576 : /*
577 : * Iterate through all tablespaces of the template database, and copy each
578 : * one to the new database.
579 : */
580 240 : rel = table_open(TableSpaceRelationId, AccessShareLock);
581 240 : scan = table_beginscan_catalog(rel, 0, NULL);
582 756 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
583 : {
584 516 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
585 516 : Oid srctablespace = spaceform->oid;
586 : Oid dsttablespace;
587 : char *srcpath;
588 : char *dstpath;
589 : struct stat st;
590 :
591 : /* No need to copy global tablespace */
592 516 : if (srctablespace == GLOBALTABLESPACE_OID)
593 276 : continue;
594 :
595 276 : srcpath = GetDatabasePath(src_dboid, srctablespace);
596 :
597 516 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
598 240 : directory_is_empty(srcpath))
599 : {
600 : /* Assume we can ignore it */
601 36 : pfree(srcpath);
602 36 : continue;
603 : }
604 :
605 240 : if (srctablespace == src_tsid)
606 240 : dsttablespace = dst_tsid;
607 : else
608 0 : dsttablespace = srctablespace;
609 :
610 240 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
611 :
612 : /*
613 : * Copy this subdirectory to the new location
614 : *
615 : * We don't need to copy subdirectories
616 : */
617 240 : copydir(srcpath, dstpath, false);
618 :
619 : /* Record the filesystem change in XLOG */
620 : {
621 : xl_dbase_create_file_copy_rec xlrec;
622 :
623 240 : xlrec.db_id = dst_dboid;
624 240 : xlrec.tablespace_id = dsttablespace;
625 240 : xlrec.src_db_id = src_dboid;
626 240 : xlrec.src_tablespace_id = srctablespace;
627 :
628 240 : XLogBeginInsert();
629 240 : XLogRegisterData(&xlrec,
630 : sizeof(xl_dbase_create_file_copy_rec));
631 :
632 240 : (void) XLogInsert(RM_DBASE_ID,
633 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
634 : }
635 240 : pfree(srcpath);
636 240 : pfree(dstpath);
637 : }
638 240 : table_endscan(scan);
639 240 : table_close(rel, AccessShareLock);
640 :
641 : /*
642 : * We force a checkpoint before committing. This effectively means that
643 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
644 : * replayed (at least not in ordinary crash recovery; we still have to
645 : * make the XLOG entry for the benefit of PITR operations). This avoids
646 : * two nasty scenarios:
647 : *
648 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
649 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
650 : * of DBASE_CREATE replay would lose such files created in the new
651 : * database between our commit and the next checkpoint.
652 : *
653 : * #2: Since we have to recopy the source database during DBASE_CREATE
654 : * replay, we run the risk of copying changes in it that were committed
655 : * after the original CREATE DATABASE command but before the system crash
656 : * that led to the replay. This is at least unexpected and at worst could
657 : * lead to inconsistencies, eg duplicate table names.
658 : *
659 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
660 : *
661 : * In PITR replay, the first of these isn't an issue, and the second is
662 : * only a risk if the CREATE DATABASE and subsequent template database
663 : * change both occur while a base backup is being taken. There doesn't
664 : * seem to be much we can do about that except document it as a
665 : * limitation.
666 : *
667 : * In binary upgrade mode, we can skip this checkpoint because neither of
668 : * these problems applies: we don't ever replay the WAL generated during
669 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
670 : * (not to mention that we don't concurrently modify template0, either).
671 : *
672 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
673 : * strategy that avoids these problems.
674 : */
675 240 : if (!IsBinaryUpgrade)
676 192 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
677 : CHECKPOINT_WAIT);
678 240 : }
679 :
680 : /*
681 : * CREATE DATABASE
682 : */
683 : Oid
684 736 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
685 : {
686 : Oid src_dboid;
687 : Oid src_owner;
688 736 : int src_encoding = -1;
689 736 : char *src_collate = NULL;
690 736 : char *src_ctype = NULL;
691 736 : char *src_locale = NULL;
692 736 : char *src_icurules = NULL;
693 736 : char src_locprovider = '\0';
694 736 : char *src_collversion = NULL;
695 : bool src_istemplate;
696 736 : bool src_hasloginevt = false;
697 : bool src_allowconn;
698 736 : TransactionId src_frozenxid = InvalidTransactionId;
699 736 : MultiXactId src_minmxid = InvalidMultiXactId;
700 : Oid src_deftablespace;
701 : volatile Oid dst_deftablespace;
702 : Relation pg_database_rel;
703 : HeapTuple tuple;
704 736 : Datum new_record[Natts_pg_database] = {0};
705 736 : bool new_record_nulls[Natts_pg_database] = {0};
706 736 : Oid dboid = InvalidOid;
707 : Oid datdba;
708 : ListCell *option;
709 736 : DefElem *tablespacenameEl = NULL;
710 736 : DefElem *ownerEl = NULL;
711 736 : DefElem *templateEl = NULL;
712 736 : DefElem *encodingEl = NULL;
713 736 : DefElem *localeEl = NULL;
714 736 : DefElem *builtinlocaleEl = NULL;
715 736 : DefElem *collateEl = NULL;
716 736 : DefElem *ctypeEl = NULL;
717 736 : DefElem *iculocaleEl = NULL;
718 736 : DefElem *icurulesEl = NULL;
719 736 : DefElem *locproviderEl = NULL;
720 736 : DefElem *istemplateEl = NULL;
721 736 : DefElem *allowconnectionsEl = NULL;
722 736 : DefElem *connlimitEl = NULL;
723 736 : DefElem *collversionEl = NULL;
724 736 : DefElem *strategyEl = NULL;
725 736 : char *dbname = stmt->dbname;
726 736 : char *dbowner = NULL;
727 736 : const char *dbtemplate = NULL;
728 736 : char *dbcollate = NULL;
729 736 : char *dbctype = NULL;
730 736 : const char *dblocale = NULL;
731 736 : char *dbicurules = NULL;
732 736 : char dblocprovider = '\0';
733 : char *canonname;
734 736 : int encoding = -1;
735 736 : bool dbistemplate = false;
736 736 : bool dballowconnections = true;
737 736 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
738 736 : char *dbcollversion = NULL;
739 : int notherbackends;
740 : int npreparedxacts;
741 736 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
742 : createdb_failure_params fparms;
743 :
744 : /* Extract options from the statement node tree */
745 2158 : foreach(option, stmt->options)
746 : {
747 1422 : DefElem *defel = (DefElem *) lfirst(option);
748 :
749 1422 : if (strcmp(defel->defname, "tablespace") == 0)
750 : {
751 16 : if (tablespacenameEl)
752 0 : errorConflictingDefElem(defel, pstate);
753 16 : tablespacenameEl = defel;
754 : }
755 1406 : else if (strcmp(defel->defname, "owner") == 0)
756 : {
757 2 : if (ownerEl)
758 0 : errorConflictingDefElem(defel, pstate);
759 2 : ownerEl = defel;
760 : }
761 1404 : else if (strcmp(defel->defname, "template") == 0)
762 : {
763 330 : if (templateEl)
764 0 : errorConflictingDefElem(defel, pstate);
765 330 : templateEl = defel;
766 : }
767 1074 : else if (strcmp(defel->defname, "encoding") == 0)
768 : {
769 96 : if (encodingEl)
770 0 : errorConflictingDefElem(defel, pstate);
771 96 : encodingEl = defel;
772 : }
773 978 : else if (strcmp(defel->defname, "locale") == 0)
774 : {
775 98 : if (localeEl)
776 0 : errorConflictingDefElem(defel, pstate);
777 98 : localeEl = defel;
778 : }
779 880 : else if (strcmp(defel->defname, "builtin_locale") == 0)
780 : {
781 16 : if (builtinlocaleEl)
782 0 : errorConflictingDefElem(defel, pstate);
783 16 : builtinlocaleEl = defel;
784 : }
785 864 : else if (strcmp(defel->defname, "lc_collate") == 0)
786 : {
787 18 : if (collateEl)
788 0 : errorConflictingDefElem(defel, pstate);
789 18 : collateEl = defel;
790 : }
791 846 : else if (strcmp(defel->defname, "lc_ctype") == 0)
792 : {
793 18 : if (ctypeEl)
794 0 : errorConflictingDefElem(defel, pstate);
795 18 : ctypeEl = defel;
796 : }
797 828 : else if (strcmp(defel->defname, "icu_locale") == 0)
798 : {
799 10 : if (iculocaleEl)
800 0 : errorConflictingDefElem(defel, pstate);
801 10 : iculocaleEl = defel;
802 : }
803 818 : else if (strcmp(defel->defname, "icu_rules") == 0)
804 : {
805 2 : if (icurulesEl)
806 0 : errorConflictingDefElem(defel, pstate);
807 2 : icurulesEl = defel;
808 : }
809 816 : else if (strcmp(defel->defname, "locale_provider") == 0)
810 : {
811 104 : if (locproviderEl)
812 0 : errorConflictingDefElem(defel, pstate);
813 104 : locproviderEl = defel;
814 : }
815 712 : else if (strcmp(defel->defname, "is_template") == 0)
816 : {
817 90 : if (istemplateEl)
818 0 : errorConflictingDefElem(defel, pstate);
819 90 : istemplateEl = defel;
820 : }
821 622 : else if (strcmp(defel->defname, "allow_connections") == 0)
822 : {
823 88 : if (allowconnectionsEl)
824 0 : errorConflictingDefElem(defel, pstate);
825 88 : allowconnectionsEl = defel;
826 : }
827 534 : else if (strcmp(defel->defname, "connection_limit") == 0)
828 : {
829 0 : if (connlimitEl)
830 0 : errorConflictingDefElem(defel, pstate);
831 0 : connlimitEl = defel;
832 : }
833 534 : else if (strcmp(defel->defname, "collation_version") == 0)
834 : {
835 48 : if (collversionEl)
836 0 : errorConflictingDefElem(defel, pstate);
837 48 : collversionEl = defel;
838 : }
839 486 : else if (strcmp(defel->defname, "location") == 0)
840 : {
841 0 : ereport(WARNING,
842 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
843 : errmsg("LOCATION is not supported anymore"),
844 : errhint("Consider using tablespaces instead."),
845 : parser_errposition(pstate, defel->location)));
846 : }
847 486 : else if (strcmp(defel->defname, "oid") == 0)
848 : {
849 230 : dboid = defGetObjectId(defel);
850 :
851 : /*
852 : * We don't normally permit new databases to be created with
853 : * system-assigned OIDs. pg_upgrade tries to preserve database
854 : * OIDs, so we can't allow any database to be created with an OID
855 : * that might be in use in a freshly-initialized cluster created
856 : * by some future version. We assume all such OIDs will be from
857 : * the system-managed OID range.
858 : *
859 : * As an exception, however, we permit any OID to be assigned when
860 : * allow_system_table_mods=on (so that initdb can assign system
861 : * OIDs to template0 and postgres) or when performing a binary
862 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
863 : * in the source cluster).
864 : */
865 230 : if (dboid < FirstNormalObjectId &&
866 204 : !allowSystemTableMods && !IsBinaryUpgrade)
867 0 : ereport(ERROR,
868 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
869 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
870 : }
871 256 : else if (strcmp(defel->defname, "strategy") == 0)
872 : {
873 256 : if (strategyEl)
874 0 : errorConflictingDefElem(defel, pstate);
875 256 : strategyEl = defel;
876 : }
877 : else
878 0 : ereport(ERROR,
879 : (errcode(ERRCODE_SYNTAX_ERROR),
880 : errmsg("option \"%s\" not recognized", defel->defname),
881 : parser_errposition(pstate, defel->location)));
882 : }
883 :
884 736 : if (ownerEl && ownerEl->arg)
885 2 : dbowner = defGetString(ownerEl);
886 736 : if (templateEl && templateEl->arg)
887 330 : dbtemplate = defGetString(templateEl);
888 736 : if (encodingEl && encodingEl->arg)
889 : {
890 : const char *encoding_name;
891 :
892 96 : if (IsA(encodingEl->arg, Integer))
893 : {
894 0 : encoding = defGetInt32(encodingEl);
895 0 : encoding_name = pg_encoding_to_char(encoding);
896 0 : if (strcmp(encoding_name, "") == 0 ||
897 0 : pg_valid_server_encoding(encoding_name) < 0)
898 0 : ereport(ERROR,
899 : (errcode(ERRCODE_UNDEFINED_OBJECT),
900 : errmsg("%d is not a valid encoding code",
901 : encoding),
902 : parser_errposition(pstate, encodingEl->location)));
903 : }
904 : else
905 : {
906 96 : encoding_name = defGetString(encodingEl);
907 96 : encoding = pg_valid_server_encoding(encoding_name);
908 96 : if (encoding < 0)
909 0 : ereport(ERROR,
910 : (errcode(ERRCODE_UNDEFINED_OBJECT),
911 : errmsg("%s is not a valid encoding name",
912 : encoding_name),
913 : parser_errposition(pstate, encodingEl->location)));
914 : }
915 : }
916 736 : if (localeEl && localeEl->arg)
917 : {
918 98 : dbcollate = defGetString(localeEl);
919 98 : dbctype = defGetString(localeEl);
920 98 : dblocale = defGetString(localeEl);
921 : }
922 736 : if (builtinlocaleEl && builtinlocaleEl->arg)
923 16 : dblocale = defGetString(builtinlocaleEl);
924 736 : if (collateEl && collateEl->arg)
925 18 : dbcollate = defGetString(collateEl);
926 736 : if (ctypeEl && ctypeEl->arg)
927 18 : dbctype = defGetString(ctypeEl);
928 736 : if (iculocaleEl && iculocaleEl->arg)
929 10 : dblocale = defGetString(iculocaleEl);
930 736 : if (icurulesEl && icurulesEl->arg)
931 2 : dbicurules = defGetString(icurulesEl);
932 736 : if (locproviderEl && locproviderEl->arg)
933 : {
934 104 : char *locproviderstr = defGetString(locproviderEl);
935 :
936 104 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
937 30 : dblocprovider = COLLPROVIDER_BUILTIN;
938 74 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
939 16 : dblocprovider = COLLPROVIDER_ICU;
940 58 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
941 56 : dblocprovider = COLLPROVIDER_LIBC;
942 : else
943 2 : ereport(ERROR,
944 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
945 : errmsg("unrecognized locale provider: %s",
946 : locproviderstr)));
947 : }
948 734 : if (istemplateEl && istemplateEl->arg)
949 90 : dbistemplate = defGetBoolean(istemplateEl);
950 734 : if (allowconnectionsEl && allowconnectionsEl->arg)
951 88 : dballowconnections = defGetBoolean(allowconnectionsEl);
952 734 : if (connlimitEl && connlimitEl->arg)
953 : {
954 0 : dbconnlimit = defGetInt32(connlimitEl);
955 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
956 0 : ereport(ERROR,
957 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958 : errmsg("invalid connection limit: %d", dbconnlimit)));
959 : }
960 734 : if (collversionEl)
961 48 : dbcollversion = defGetString(collversionEl);
962 :
963 : /* obtain OID of proposed owner */
964 734 : if (dbowner)
965 2 : datdba = get_role_oid(dbowner, false);
966 : else
967 732 : datdba = GetUserId();
968 :
969 : /*
970 : * To create a database, must have createdb privilege and must be able to
971 : * become the target role (this does not imply that the target role itself
972 : * must have createdb privilege). The latter provision guards against
973 : * "giveaway" attacks. Note that a superuser will always have both of
974 : * these privileges a fortiori.
975 : */
976 734 : if (!have_createdb_privilege())
977 6 : ereport(ERROR,
978 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
979 : errmsg("permission denied to create database")));
980 :
981 728 : check_can_set_role(GetUserId(), datdba);
982 :
983 : /*
984 : * Lookup database (template) to be cloned, and obtain share lock on it.
985 : * ShareLock allows two CREATE DATABASEs to work from the same template
986 : * concurrently, while ensuring no one is busy dropping it in parallel
987 : * (which would be Very Bad since we'd likely get an incomplete copy
988 : * without knowing it). This also prevents any new connections from being
989 : * made to the source until we finish copying it, so we can be sure it
990 : * won't change underneath us.
991 : */
992 728 : if (!dbtemplate)
993 400 : dbtemplate = "template1"; /* Default template database name */
994 :
995 728 : if (!get_db_info(dbtemplate, ShareLock,
996 : &src_dboid, &src_owner, &src_encoding,
997 : &src_istemplate, &src_allowconn, &src_hasloginevt,
998 : &src_frozenxid, &src_minmxid, &src_deftablespace,
999 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1000 : &src_collversion))
1001 0 : ereport(ERROR,
1002 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1003 : errmsg("template database \"%s\" does not exist",
1004 : dbtemplate)));
1005 :
1006 : /*
1007 : * If the source database was in the process of being dropped, we can't
1008 : * use it as a template.
1009 : */
1010 728 : if (database_is_invalid_oid(src_dboid))
1011 2 : ereport(ERROR,
1012 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1013 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1014 : errhint("Use DROP DATABASE to drop invalid databases."));
1015 :
1016 : /*
1017 : * Permission check: to copy a DB that's not marked datistemplate, you
1018 : * must be superuser or the owner thereof.
1019 : */
1020 726 : if (!src_istemplate)
1021 : {
1022 16 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1023 0 : ereport(ERROR,
1024 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1025 : errmsg("permission denied to copy database \"%s\"",
1026 : dbtemplate)));
1027 : }
1028 :
1029 : /* Validate the database creation strategy. */
1030 726 : if (strategyEl && strategyEl->arg)
1031 : {
1032 : char *strategy;
1033 :
1034 256 : strategy = defGetString(strategyEl);
1035 256 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1036 14 : dbstrategy = CREATEDB_WAL_LOG;
1037 242 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1038 240 : dbstrategy = CREATEDB_FILE_COPY;
1039 : else
1040 2 : ereport(ERROR,
1041 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1042 : errmsg("invalid create database strategy \"%s\"", strategy),
1043 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1044 : }
1045 :
1046 : /* If encoding or locales are defaulted, use source's setting */
1047 724 : if (encoding < 0)
1048 628 : encoding = src_encoding;
1049 724 : if (dbcollate == NULL)
1050 614 : dbcollate = src_collate;
1051 724 : if (dbctype == NULL)
1052 614 : dbctype = src_ctype;
1053 724 : if (dblocprovider == '\0')
1054 622 : dblocprovider = src_locprovider;
1055 724 : if (dblocale == NULL)
1056 618 : dblocale = src_locale;
1057 724 : if (dbicurules == NULL)
1058 722 : dbicurules = src_icurules;
1059 :
1060 : /* Some encodings are client only */
1061 724 : if (!PG_VALID_BE_ENCODING(encoding))
1062 0 : ereport(ERROR,
1063 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1064 : errmsg("invalid server encoding %d", encoding)));
1065 :
1066 : /* Check that the chosen locales are valid, and get canonical spellings */
1067 724 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1068 2 : ereport(ERROR,
1069 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1070 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1071 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1072 722 : dbcollate = canonname;
1073 722 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1074 2 : ereport(ERROR,
1075 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1076 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1077 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1078 720 : dbctype = canonname;
1079 :
1080 720 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1081 :
1082 : /* validate provider-specific parameters */
1083 720 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1084 : {
1085 662 : if (builtinlocaleEl)
1086 0 : ereport(ERROR,
1087 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1088 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1089 : }
1090 :
1091 720 : if (dblocprovider != COLLPROVIDER_ICU)
1092 : {
1093 690 : if (iculocaleEl)
1094 2 : ereport(ERROR,
1095 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1096 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1097 :
1098 688 : if (dbicurules)
1099 2 : ereport(ERROR,
1100 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1101 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1102 : }
1103 :
1104 : /* validate and canonicalize locale for the provider */
1105 716 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1106 : {
1107 : /*
1108 : * This would happen if template0 uses the libc provider but the new
1109 : * database uses builtin.
1110 : */
1111 54 : if (!dblocale)
1112 2 : ereport(ERROR,
1113 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1114 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1115 :
1116 52 : dblocale = builtin_validate_locale(encoding, dblocale);
1117 : }
1118 662 : else if (dblocprovider == COLLPROVIDER_ICU)
1119 : {
1120 30 : if (!(is_encoding_supported_by_icu(encoding)))
1121 2 : ereport(ERROR,
1122 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1123 : errmsg("encoding \"%s\" is not supported with ICU provider",
1124 : pg_encoding_to_char(encoding))));
1125 :
1126 : /*
1127 : * This would happen if template0 uses the libc provider but the new
1128 : * database uses icu.
1129 : */
1130 28 : if (!dblocale)
1131 2 : ereport(ERROR,
1132 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1133 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1134 :
1135 : /*
1136 : * During binary upgrade, or when the locale came from the template
1137 : * database, preserve locale string. Otherwise, canonicalize to a
1138 : * language tag.
1139 : */
1140 26 : if (!IsBinaryUpgrade && dblocale != src_locale)
1141 : {
1142 14 : char *langtag = icu_language_tag(dblocale,
1143 : icu_validation_level);
1144 :
1145 14 : if (langtag && strcmp(dblocale, langtag) != 0)
1146 : {
1147 6 : ereport(NOTICE,
1148 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1149 : langtag, dblocale)));
1150 :
1151 6 : dblocale = langtag;
1152 : }
1153 : }
1154 :
1155 26 : icu_validate_locale(dblocale);
1156 : }
1157 :
1158 : /* for libc, locale comes from datcollate and datctype */
1159 706 : if (dblocprovider == COLLPROVIDER_LIBC)
1160 632 : dblocale = NULL;
1161 :
1162 : /*
1163 : * Check that the new encoding and locale settings match the source
1164 : * database. We insist on this because we simply copy the source data ---
1165 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1166 : * according to the source locale would be wrong.
1167 : *
1168 : * However, we assume that template0 doesn't contain any non-ASCII data
1169 : * nor any indexes that depend on collation or ctype, so template0 can be
1170 : * used as template for creating a database with any encoding or locale.
1171 : */
1172 706 : if (strcmp(dbtemplate, "template0") != 0)
1173 : {
1174 418 : if (encoding != src_encoding)
1175 0 : ereport(ERROR,
1176 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1177 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1178 : pg_encoding_to_char(encoding),
1179 : pg_encoding_to_char(src_encoding)),
1180 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1181 :
1182 418 : if (strcmp(dbcollate, src_collate) != 0)
1183 2 : ereport(ERROR,
1184 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1185 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1186 : dbcollate, src_collate),
1187 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1188 :
1189 416 : if (strcmp(dbctype, src_ctype) != 0)
1190 0 : ereport(ERROR,
1191 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1192 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1193 : dbctype, src_ctype),
1194 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1195 :
1196 416 : if (dblocprovider != src_locprovider)
1197 0 : ereport(ERROR,
1198 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1199 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1200 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1201 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1202 :
1203 416 : if (dblocprovider == COLLPROVIDER_ICU)
1204 : {
1205 : char *val1;
1206 : char *val2;
1207 :
1208 : Assert(dblocale);
1209 : Assert(src_locale);
1210 12 : if (strcmp(dblocale, src_locale) != 0)
1211 0 : ereport(ERROR,
1212 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1213 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1214 : dblocale, src_locale),
1215 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1216 :
1217 12 : val1 = dbicurules;
1218 12 : if (!val1)
1219 12 : val1 = "";
1220 12 : val2 = src_icurules;
1221 12 : if (!val2)
1222 12 : val2 = "";
1223 12 : if (strcmp(val1, val2) != 0)
1224 0 : ereport(ERROR,
1225 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1226 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1227 : val1, val2),
1228 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1229 : }
1230 : }
1231 :
1232 : /*
1233 : * If we got a collation version for the template database, check that it
1234 : * matches the actual OS collation version. Otherwise error; the user
1235 : * needs to fix the template database first. Don't complain if a
1236 : * collation version was specified explicitly as a statement option; that
1237 : * is used by pg_upgrade to reproduce the old state exactly.
1238 : *
1239 : * (If the template database has no collation version, then either the
1240 : * platform/provider does not support collation versioning, or it's
1241 : * template0, for which we stipulate that it does not contain
1242 : * collation-using objects.)
1243 : */
1244 704 : if (src_collversion && !collversionEl)
1245 : {
1246 : char *actual_versionstr;
1247 : const char *locale;
1248 :
1249 268 : if (dblocprovider == COLLPROVIDER_LIBC)
1250 246 : locale = dbcollate;
1251 : else
1252 22 : locale = dblocale;
1253 :
1254 268 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1255 268 : if (!actual_versionstr)
1256 0 : ereport(ERROR,
1257 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1258 : dbtemplate)));
1259 :
1260 268 : if (strcmp(actual_versionstr, src_collversion) != 0)
1261 0 : ereport(ERROR,
1262 : (errmsg("template database \"%s\" has a collation version mismatch",
1263 : dbtemplate),
1264 : errdetail("The template database was created using collation version %s, "
1265 : "but the operating system provides version %s.",
1266 : src_collversion, actual_versionstr),
1267 : errhint("Rebuild all objects in the template database that use the default collation and run "
1268 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1269 : "or build PostgreSQL with the right library version.",
1270 : quote_identifier(dbtemplate))));
1271 : }
1272 :
1273 704 : if (dbcollversion == NULL)
1274 656 : dbcollversion = src_collversion;
1275 :
1276 : /*
1277 : * Normally, we copy the collation version from the template database.
1278 : * This last resort only applies if the template database does not have a
1279 : * collation version, which is normally only the case for template0.
1280 : */
1281 704 : if (dbcollversion == NULL)
1282 : {
1283 : const char *locale;
1284 :
1285 388 : if (dblocprovider == COLLPROVIDER_LIBC)
1286 350 : locale = dbcollate;
1287 : else
1288 38 : locale = dblocale;
1289 :
1290 388 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1291 : }
1292 :
1293 : /* Resolve default tablespace for new database */
1294 704 : if (tablespacenameEl && tablespacenameEl->arg)
1295 16 : {
1296 : char *tablespacename;
1297 : AclResult aclresult;
1298 :
1299 16 : tablespacename = defGetString(tablespacenameEl);
1300 16 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1301 : /* check permissions */
1302 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1303 : ACL_CREATE);
1304 16 : if (aclresult != ACLCHECK_OK)
1305 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1306 : tablespacename);
1307 :
1308 : /* pg_global must never be the default tablespace */
1309 16 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1310 0 : ereport(ERROR,
1311 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1312 : errmsg("pg_global cannot be used as default tablespace")));
1313 :
1314 : /*
1315 : * If we are trying to change the default tablespace of the template,
1316 : * we require that the template not have any files in the new default
1317 : * tablespace. This is necessary because otherwise the copied
1318 : * database would contain pg_class rows that refer to its default
1319 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1320 : * would cause problems. For example another CREATE DATABASE using
1321 : * the copied database as template, and trying to change its default
1322 : * tablespace again, would yield outright incorrect results (it would
1323 : * improperly move tables to the new default tablespace that should
1324 : * stay in the same tablespace).
1325 : */
1326 16 : if (dst_deftablespace != src_deftablespace)
1327 : {
1328 : char *srcpath;
1329 : struct stat st;
1330 :
1331 16 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1332 :
1333 16 : if (stat(srcpath, &st) == 0 &&
1334 0 : S_ISDIR(st.st_mode) &&
1335 0 : !directory_is_empty(srcpath))
1336 0 : ereport(ERROR,
1337 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338 : errmsg("cannot assign new default tablespace \"%s\"",
1339 : tablespacename),
1340 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1341 : dbtemplate)));
1342 16 : pfree(srcpath);
1343 : }
1344 : }
1345 : else
1346 : {
1347 : /* Use template database's default tablespace */
1348 688 : dst_deftablespace = src_deftablespace;
1349 : /* Note there is no additional permission check in this path */
1350 : }
1351 :
1352 : /*
1353 : * If built with appropriate switch, whine when regression-testing
1354 : * conventions for database names are violated. But don't complain during
1355 : * initdb.
1356 : */
1357 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1358 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1359 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1360 : #endif
1361 :
1362 : /*
1363 : * Check for db name conflict. This is just to give a more friendly error
1364 : * message than "unique index violation". There's a race condition but
1365 : * we're willing to accept the less friendly message in that case.
1366 : */
1367 704 : if (OidIsValid(get_database_oid(dbname, true)))
1368 2 : ereport(ERROR,
1369 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1370 : errmsg("database \"%s\" already exists", dbname)));
1371 :
1372 : /*
1373 : * The source DB can't have any active backends, except this one
1374 : * (exception is to allow CREATE DB while connected to template1).
1375 : * Otherwise we might copy inconsistent data.
1376 : *
1377 : * This should be last among the basic error checks, because it involves
1378 : * potential waiting; we may as well throw an error first if we're gonna
1379 : * throw one.
1380 : */
1381 702 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1382 0 : ereport(ERROR,
1383 : (errcode(ERRCODE_OBJECT_IN_USE),
1384 : errmsg("source database \"%s\" is being accessed by other users",
1385 : dbtemplate),
1386 : errdetail_busy_db(notherbackends, npreparedxacts)));
1387 :
1388 : /*
1389 : * Select an OID for the new database, checking that it doesn't have a
1390 : * filename conflict with anything already existing in the tablespace
1391 : * directories.
1392 : */
1393 702 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1394 :
1395 : /*
1396 : * If database OID is configured, check if the OID is already in use or
1397 : * data directory already exists.
1398 : */
1399 702 : if (OidIsValid(dboid))
1400 : {
1401 230 : char *existing_dbname = get_database_name(dboid);
1402 :
1403 230 : if (existing_dbname != NULL)
1404 0 : ereport(ERROR,
1405 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1406 : errmsg("database OID %u is already in use by database \"%s\"",
1407 : dboid, existing_dbname));
1408 :
1409 230 : if (check_db_file_conflict(dboid))
1410 0 : ereport(ERROR,
1411 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1412 : errmsg("data directory with the specified OID %u already exists", dboid));
1413 : }
1414 : else
1415 : {
1416 : /* Select an OID for the new database if is not explicitly configured. */
1417 : do
1418 : {
1419 472 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1420 : Anum_pg_database_oid);
1421 472 : } while (check_db_file_conflict(dboid));
1422 : }
1423 :
1424 : /*
1425 : * Insert a new tuple into pg_database. This establishes our ownership of
1426 : * the new database name (anyone else trying to insert the same name will
1427 : * block on the unique index, and fail after we commit).
1428 : */
1429 :
1430 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1431 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1432 :
1433 : /* Form tuple */
1434 702 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1435 702 : new_record[Anum_pg_database_datname - 1] =
1436 702 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1437 702 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1438 702 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1439 702 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1440 702 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1441 702 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1442 702 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1443 702 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1444 702 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1445 702 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1446 702 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1447 702 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1448 702 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1449 702 : if (dblocale)
1450 72 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1451 : else
1452 630 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1453 702 : if (dbicurules)
1454 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1455 : else
1456 702 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1457 702 : if (dbcollversion)
1458 582 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1459 : else
1460 120 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1461 :
1462 : /*
1463 : * We deliberately set datacl to default (NULL), rather than copying it
1464 : * from the template database. Copying it would be a bad idea when the
1465 : * owner is not the same as the template's owner.
1466 : */
1467 702 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1468 :
1469 702 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1470 : new_record, new_record_nulls);
1471 :
1472 702 : CatalogTupleInsert(pg_database_rel, tuple);
1473 :
1474 : /*
1475 : * Now generate additional catalog entries associated with the new DB
1476 : */
1477 :
1478 : /* Register owner dependency */
1479 702 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1480 :
1481 : /* Create pg_shdepend entries for objects within database */
1482 702 : copyTemplateDependencies(src_dboid, dboid);
1483 :
1484 : /* Post creation hook for new database */
1485 702 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1486 :
1487 : /*
1488 : * If we're going to be reading data for the to-be-created database into
1489 : * shared_buffers, take a lock on it. Nobody should know that this
1490 : * database exists yet, but it's good to maintain the invariant that an
1491 : * AccessExclusiveLock on the database is sufficient to drop all of its
1492 : * buffers without worrying about more being read later.
1493 : *
1494 : * Note that we need to do this before entering the
1495 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1496 : * expects this lock to be held already.
1497 : */
1498 702 : if (dbstrategy == CREATEDB_WAL_LOG)
1499 462 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1500 :
1501 : /*
1502 : * Once we start copying subdirectories, we need to be able to clean 'em
1503 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1504 : * is not a 100% solution, because of the possibility of failure during
1505 : * transaction commit after we leave this routine, but it should handle
1506 : * most scenarios.)
1507 : */
1508 702 : fparms.src_dboid = src_dboid;
1509 702 : fparms.dest_dboid = dboid;
1510 702 : fparms.strategy = dbstrategy;
1511 :
1512 702 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1513 : PointerGetDatum(&fparms));
1514 : {
1515 : /*
1516 : * If the user has asked to create a database with WAL_LOG strategy
1517 : * then call CreateDatabaseUsingWalLog, which will copy the database
1518 : * at the block level and it will WAL log each copied block.
1519 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1520 : * database file by file.
1521 : */
1522 702 : if (dbstrategy == CREATEDB_WAL_LOG)
1523 462 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1524 : dst_deftablespace);
1525 : else
1526 240 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1527 : dst_deftablespace);
1528 :
1529 : /*
1530 : * Close pg_database, but keep lock till commit.
1531 : */
1532 702 : table_close(pg_database_rel, NoLock);
1533 :
1534 : /*
1535 : * Force synchronous commit, thus minimizing the window between
1536 : * creation of the database files and committal of the transaction. If
1537 : * we crash before committing, we'll have a DB that's taking up disk
1538 : * space but is not in pg_database, which is not good.
1539 : */
1540 702 : ForceSyncCommit();
1541 : }
1542 702 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1543 : PointerGetDatum(&fparms));
1544 :
1545 702 : return dboid;
1546 : }
1547 :
1548 : /*
1549 : * Check whether chosen encoding matches chosen locale settings. This
1550 : * restriction is necessary because libc's locale-specific code usually
1551 : * fails when presented with data in an encoding it's not expecting. We
1552 : * allow mismatch in four cases:
1553 : *
1554 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1555 : * which works with any encoding.
1556 : *
1557 : * 2. locale encoding = -1, which means that we couldn't determine the
1558 : * locale's encoding and have to trust the user to get it right.
1559 : *
1560 : * 3. selected encoding is UTF8 and platform is win32. This is because
1561 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1562 : * converted to UTF16 before being used.
1563 : *
1564 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1565 : * is risky but we have historically allowed it --- notably, the
1566 : * regression tests require it.
1567 : *
1568 : * Note: if you change this policy, fix initdb to match.
1569 : */
1570 : void
1571 748 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1572 : {
1573 748 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1574 748 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1575 :
1576 756 : if (!(ctype_encoding == encoding ||
1577 8 : ctype_encoding == PG_SQL_ASCII ||
1578 : ctype_encoding == -1 ||
1579 : #ifdef WIN32
1580 : encoding == PG_UTF8 ||
1581 : #endif
1582 8 : (encoding == PG_SQL_ASCII && superuser())))
1583 0 : ereport(ERROR,
1584 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1585 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1586 : pg_encoding_to_char(encoding),
1587 : ctype),
1588 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1589 : pg_encoding_to_char(ctype_encoding))));
1590 :
1591 756 : if (!(collate_encoding == encoding ||
1592 8 : collate_encoding == PG_SQL_ASCII ||
1593 : collate_encoding == -1 ||
1594 : #ifdef WIN32
1595 : encoding == PG_UTF8 ||
1596 : #endif
1597 8 : (encoding == PG_SQL_ASCII && superuser())))
1598 0 : ereport(ERROR,
1599 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1600 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1601 : pg_encoding_to_char(encoding),
1602 : collate),
1603 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1604 : pg_encoding_to_char(collate_encoding))));
1605 748 : }
1606 :
1607 : /* Error cleanup callback for createdb */
1608 : static void
1609 0 : createdb_failure_callback(int code, Datum arg)
1610 : {
1611 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1612 :
1613 : /*
1614 : * If we were copying database at block levels then drop pages for the
1615 : * destination database that are in the shared buffer cache. And tell
1616 : * checkpointer to forget any pending fsync and unlink requests for files
1617 : * in the database. The reasoning behind doing this is same as explained
1618 : * in dropdb function. But unlike dropdb we don't need to call
1619 : * pgstat_drop_database because this database is still not created so
1620 : * there should not be any stat for this.
1621 : */
1622 0 : if (fparms->strategy == CREATEDB_WAL_LOG)
1623 : {
1624 0 : DropDatabaseBuffers(fparms->dest_dboid);
1625 0 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1626 :
1627 : /* Release lock on the target database. */
1628 0 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1629 : AccessShareLock);
1630 : }
1631 :
1632 : /*
1633 : * Release lock on source database before doing recursive remove. This is
1634 : * not essential but it seems desirable to release the lock as soon as
1635 : * possible.
1636 : */
1637 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1638 :
1639 : /* Throw away any successfully copied subdirectories */
1640 0 : remove_dbtablespaces(fparms->dest_dboid);
1641 0 : }
1642 :
1643 :
1644 : /*
1645 : * DROP DATABASE
1646 : */
1647 : void
1648 122 : dropdb(const char *dbname, bool missing_ok, bool force)
1649 : {
1650 : Oid db_id;
1651 : bool db_istemplate;
1652 : Relation pgdbrel;
1653 : HeapTuple tup;
1654 : ScanKeyData scankey;
1655 : void *inplace_state;
1656 : Form_pg_database datform;
1657 : int notherbackends;
1658 : int npreparedxacts;
1659 : int nslots,
1660 : nslots_active;
1661 : int nsubscriptions;
1662 :
1663 : /*
1664 : * Look up the target database's OID, and get exclusive lock on it. We
1665 : * need this to ensure that no new backend starts up in the target
1666 : * database while we are deleting it (see postinit.c), and that no one is
1667 : * using it as a CREATE DATABASE template or trying to delete it for
1668 : * themselves.
1669 : */
1670 122 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1671 :
1672 122 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1673 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1674 : {
1675 32 : if (!missing_ok)
1676 : {
1677 16 : ereport(ERROR,
1678 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1679 : errmsg("database \"%s\" does not exist", dbname)));
1680 : }
1681 : else
1682 : {
1683 : /* Close pg_database, release the lock, since we changed nothing */
1684 16 : table_close(pgdbrel, RowExclusiveLock);
1685 16 : ereport(NOTICE,
1686 : (errmsg("database \"%s\" does not exist, skipping",
1687 : dbname)));
1688 16 : return;
1689 : }
1690 : }
1691 :
1692 : /*
1693 : * Permission checks
1694 : */
1695 90 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1696 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1697 : dbname);
1698 :
1699 : /* DROP hook for the database being removed */
1700 90 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1701 :
1702 : /*
1703 : * Disallow dropping a DB that is marked istemplate. This is just to
1704 : * prevent people from accidentally dropping template0 or template1; they
1705 : * can do so if they're really determined ...
1706 : */
1707 90 : if (db_istemplate)
1708 0 : ereport(ERROR,
1709 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1710 : errmsg("cannot drop a template database")));
1711 :
1712 : /* Obviously can't drop my own database */
1713 90 : if (db_id == MyDatabaseId)
1714 0 : ereport(ERROR,
1715 : (errcode(ERRCODE_OBJECT_IN_USE),
1716 : errmsg("cannot drop the currently open database")));
1717 :
1718 : /*
1719 : * Check whether there are active logical slots that refer to the
1720 : * to-be-dropped database. The database lock we are holding prevents the
1721 : * creation of new slots using the database or existing slots becoming
1722 : * active.
1723 : */
1724 90 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1725 90 : if (nslots_active)
1726 : {
1727 2 : ereport(ERROR,
1728 : (errcode(ERRCODE_OBJECT_IN_USE),
1729 : errmsg("database \"%s\" is used by an active logical replication slot",
1730 : dbname),
1731 : errdetail_plural("There is %d active slot.",
1732 : "There are %d active slots.",
1733 : nslots_active, nslots_active)));
1734 : }
1735 :
1736 : /*
1737 : * Check if there are subscriptions defined in the target database.
1738 : *
1739 : * We can't drop them automatically because they might be holding
1740 : * resources in other databases/instances.
1741 : */
1742 88 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1743 0 : ereport(ERROR,
1744 : (errcode(ERRCODE_OBJECT_IN_USE),
1745 : errmsg("database \"%s\" is being used by logical replication subscription",
1746 : dbname),
1747 : errdetail_plural("There is %d subscription.",
1748 : "There are %d subscriptions.",
1749 : nsubscriptions, nsubscriptions)));
1750 :
1751 :
1752 : /*
1753 : * Attempt to terminate all existing connections to the target database if
1754 : * the user has requested to do so.
1755 : */
1756 88 : if (force)
1757 2 : TerminateOtherDBBackends(db_id);
1758 :
1759 : /*
1760 : * Check for other backends in the target database. (Because we hold the
1761 : * database lock, no new ones can start after this.)
1762 : *
1763 : * As in CREATE DATABASE, check this after other error conditions.
1764 : */
1765 88 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1766 0 : ereport(ERROR,
1767 : (errcode(ERRCODE_OBJECT_IN_USE),
1768 : errmsg("database \"%s\" is being accessed by other users",
1769 : dbname),
1770 : errdetail_busy_db(notherbackends, npreparedxacts)));
1771 :
1772 : /*
1773 : * Delete any comments or security labels associated with the database.
1774 : */
1775 88 : DeleteSharedComments(db_id, DatabaseRelationId);
1776 88 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1777 :
1778 : /*
1779 : * Remove settings associated with this database
1780 : */
1781 88 : DropSetting(db_id, InvalidOid);
1782 :
1783 : /*
1784 : * Remove shared dependency references for the database.
1785 : */
1786 88 : dropDatabaseDependencies(db_id);
1787 :
1788 : /*
1789 : * Tell the cumulative stats system to forget it immediately, too.
1790 : */
1791 88 : pgstat_drop_database(db_id);
1792 :
1793 : /*
1794 : * Except for the deletion of the catalog row, subsequent actions are not
1795 : * transactional (consider DropDatabaseBuffers() discarding modified
1796 : * buffers). But we might crash or get interrupted below. To prevent
1797 : * accesses to a database with invalid contents, mark the database as
1798 : * invalid using an in-place update.
1799 : *
1800 : * We need to flush the WAL before continuing, to guarantee the
1801 : * modification is durable before performing irreversible filesystem
1802 : * operations.
1803 : */
1804 88 : ScanKeyInit(&scankey,
1805 : Anum_pg_database_datname,
1806 : BTEqualStrategyNumber, F_NAMEEQ,
1807 : CStringGetDatum(dbname));
1808 88 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1809 : NULL, 1, &scankey, &tup, &inplace_state);
1810 88 : if (!HeapTupleIsValid(tup))
1811 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1812 88 : datform = (Form_pg_database) GETSTRUCT(tup);
1813 88 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1814 88 : systable_inplace_update_finish(inplace_state, tup);
1815 88 : XLogFlush(XactLastRecEnd);
1816 :
1817 : /*
1818 : * Also delete the tuple - transactionally. If this transaction commits,
1819 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1820 : */
1821 88 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1822 88 : heap_freetuple(tup);
1823 :
1824 : /*
1825 : * Drop db-specific replication slots.
1826 : */
1827 88 : ReplicationSlotsDropDBSlots(db_id);
1828 :
1829 : /*
1830 : * Drop pages for this database that are in the shared buffer cache. This
1831 : * is important to ensure that no remaining backend tries to write out a
1832 : * dirty buffer to the dead database later...
1833 : */
1834 88 : DropDatabaseBuffers(db_id);
1835 :
1836 : /*
1837 : * Tell checkpointer to forget any pending fsync and unlink requests for
1838 : * files in the database; else the fsyncs will fail at next checkpoint, or
1839 : * worse, it will delete files that belong to a newly created database
1840 : * with the same OID.
1841 : */
1842 88 : ForgetDatabaseSyncRequests(db_id);
1843 :
1844 : /*
1845 : * Force a checkpoint to make sure the checkpointer has received the
1846 : * message sent by ForgetDatabaseSyncRequests.
1847 : */
1848 88 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1849 :
1850 : /* Close all smgr fds in all backends. */
1851 88 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1852 :
1853 : /*
1854 : * Remove all tablespace subdirs belonging to the database.
1855 : */
1856 88 : remove_dbtablespaces(db_id);
1857 :
1858 : /*
1859 : * Close pg_database, but keep lock till commit.
1860 : */
1861 88 : table_close(pgdbrel, NoLock);
1862 :
1863 : /*
1864 : * Force synchronous commit, thus minimizing the window between removal of
1865 : * the database files and committal of the transaction. If we crash before
1866 : * committing, we'll have a DB that's gone on disk but still there
1867 : * according to pg_database, which is not good.
1868 : */
1869 88 : ForceSyncCommit();
1870 : }
1871 :
1872 :
1873 : /*
1874 : * Rename database
1875 : */
1876 : ObjectAddress
1877 6 : RenameDatabase(const char *oldname, const char *newname)
1878 : {
1879 : Oid db_id;
1880 : HeapTuple newtup;
1881 : ItemPointerData otid;
1882 : Relation rel;
1883 : int notherbackends;
1884 : int npreparedxacts;
1885 : ObjectAddress address;
1886 :
1887 : /*
1888 : * Look up the target database's OID, and get exclusive lock on it. We
1889 : * need this for the same reasons as DROP DATABASE.
1890 : */
1891 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1892 :
1893 6 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1894 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1895 0 : ereport(ERROR,
1896 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1897 : errmsg("database \"%s\" does not exist", oldname)));
1898 :
1899 : /* must be owner */
1900 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1901 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1902 : oldname);
1903 :
1904 : /* must have createdb rights */
1905 6 : if (!have_createdb_privilege())
1906 0 : ereport(ERROR,
1907 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1908 : errmsg("permission denied to rename database")));
1909 :
1910 : /*
1911 : * If built with appropriate switch, whine when regression-testing
1912 : * conventions for database names are violated.
1913 : */
1914 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1915 : if (strstr(newname, "regression") == NULL)
1916 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1917 : #endif
1918 :
1919 : /*
1920 : * Make sure the new name doesn't exist. See notes for same error in
1921 : * CREATE DATABASE.
1922 : */
1923 6 : if (OidIsValid(get_database_oid(newname, true)))
1924 0 : ereport(ERROR,
1925 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1926 : errmsg("database \"%s\" already exists", newname)));
1927 :
1928 : /*
1929 : * XXX Client applications probably store the current database somewhere,
1930 : * so renaming it could cause confusion. On the other hand, there may not
1931 : * be an actual problem besides a little confusion, so think about this
1932 : * and decide.
1933 : */
1934 6 : if (db_id == MyDatabaseId)
1935 0 : ereport(ERROR,
1936 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1937 : errmsg("current database cannot be renamed")));
1938 :
1939 : /*
1940 : * Make sure the database does not have active sessions. This is the same
1941 : * concern as above, but applied to other sessions.
1942 : *
1943 : * As in CREATE DATABASE, check this after other error conditions.
1944 : */
1945 6 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1946 0 : ereport(ERROR,
1947 : (errcode(ERRCODE_OBJECT_IN_USE),
1948 : errmsg("database \"%s\" is being accessed by other users",
1949 : oldname),
1950 : errdetail_busy_db(notherbackends, npreparedxacts)));
1951 :
1952 : /* rename */
1953 6 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1954 6 : if (!HeapTupleIsValid(newtup))
1955 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1956 6 : otid = newtup->t_self;
1957 6 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1958 6 : CatalogTupleUpdate(rel, &otid, newtup);
1959 6 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1960 :
1961 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1962 :
1963 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1964 :
1965 : /*
1966 : * Close pg_database, but keep lock till commit.
1967 : */
1968 6 : table_close(rel, NoLock);
1969 :
1970 6 : return address;
1971 : }
1972 :
1973 :
1974 : /*
1975 : * ALTER DATABASE SET TABLESPACE
1976 : */
1977 : static void
1978 16 : movedb(const char *dbname, const char *tblspcname)
1979 : {
1980 : Oid db_id;
1981 : Relation pgdbrel;
1982 : int notherbackends;
1983 : int npreparedxacts;
1984 : HeapTuple oldtuple,
1985 : newtuple;
1986 : Oid src_tblspcoid,
1987 : dst_tblspcoid;
1988 : ScanKeyData scankey;
1989 : SysScanDesc sysscan;
1990 : AclResult aclresult;
1991 : char *src_dbpath;
1992 : char *dst_dbpath;
1993 : DIR *dstdir;
1994 : struct dirent *xlde;
1995 : movedb_failure_params fparms;
1996 :
1997 : /*
1998 : * Look up the target database's OID, and get exclusive lock on it. We
1999 : * need this to ensure that no new backend starts up in the database while
2000 : * we are moving it, and that no one is using it as a CREATE DATABASE
2001 : * template or trying to delete it.
2002 : */
2003 16 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2004 :
2005 16 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2006 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2007 0 : ereport(ERROR,
2008 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2009 : errmsg("database \"%s\" does not exist", dbname)));
2010 :
2011 : /*
2012 : * We actually need a session lock, so that the lock will persist across
2013 : * the commit/restart below. (We could almost get away with letting the
2014 : * lock be released at commit, except that someone could try to move
2015 : * relations of the DB back into the old directory while we rmtree() it.)
2016 : */
2017 16 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2018 : AccessExclusiveLock);
2019 :
2020 : /*
2021 : * Permission checks
2022 : */
2023 16 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2024 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2025 : dbname);
2026 :
2027 : /*
2028 : * Obviously can't move the tables of my own database
2029 : */
2030 16 : if (db_id == MyDatabaseId)
2031 0 : ereport(ERROR,
2032 : (errcode(ERRCODE_OBJECT_IN_USE),
2033 : errmsg("cannot change the tablespace of the currently open database")));
2034 :
2035 : /*
2036 : * Get tablespace's oid
2037 : */
2038 16 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2039 :
2040 : /*
2041 : * Permission checks
2042 : */
2043 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2044 : ACL_CREATE);
2045 16 : if (aclresult != ACLCHECK_OK)
2046 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2047 : tblspcname);
2048 :
2049 : /*
2050 : * pg_global must never be the default tablespace
2051 : */
2052 16 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2053 0 : ereport(ERROR,
2054 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2055 : errmsg("pg_global cannot be used as default tablespace")));
2056 :
2057 : /*
2058 : * No-op if same tablespace
2059 : */
2060 16 : if (src_tblspcoid == dst_tblspcoid)
2061 : {
2062 0 : table_close(pgdbrel, NoLock);
2063 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2064 : AccessExclusiveLock);
2065 0 : return;
2066 : }
2067 :
2068 : /*
2069 : * Check for other backends in the target database. (Because we hold the
2070 : * database lock, no new ones can start after this.)
2071 : *
2072 : * As in CREATE DATABASE, check this after other error conditions.
2073 : */
2074 16 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2075 0 : ereport(ERROR,
2076 : (errcode(ERRCODE_OBJECT_IN_USE),
2077 : errmsg("database \"%s\" is being accessed by other users",
2078 : dbname),
2079 : errdetail_busy_db(notherbackends, npreparedxacts)));
2080 :
2081 : /*
2082 : * Get old and new database paths
2083 : */
2084 16 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2085 16 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2086 :
2087 : /*
2088 : * Force a checkpoint before proceeding. This will force all dirty
2089 : * buffers, including those of unlogged tables, out to disk, to ensure
2090 : * source database is up-to-date on disk for the copy.
2091 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2092 : * process any pending unlink requests. Otherwise, the check for existing
2093 : * files in the target directory might fail unnecessarily, not to mention
2094 : * that the copy might fail due to source files getting deleted under it.
2095 : * On Windows, this also ensures that background procs don't hold any open
2096 : * files, which would cause rmdir() to fail.
2097 : */
2098 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2099 : | CHECKPOINT_FLUSH_ALL);
2100 :
2101 : /* Close all smgr fds in all backends. */
2102 16 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2103 :
2104 : /*
2105 : * Now drop all buffers holding data of the target database; they should
2106 : * no longer be dirty so DropDatabaseBuffers is safe.
2107 : *
2108 : * It might seem that we could just let these buffers age out of shared
2109 : * buffers naturally, since they should not get referenced anymore. The
2110 : * problem with that is that if the user later moves the database back to
2111 : * its original tablespace, any still-surviving buffers would appear to
2112 : * contain valid data again --- but they'd be missing any changes made in
2113 : * the database while it was in the new tablespace. In any case, freeing
2114 : * buffers that should never be used again seems worth the cycles.
2115 : *
2116 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2117 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2118 : */
2119 16 : DropDatabaseBuffers(db_id);
2120 :
2121 : /*
2122 : * Check for existence of files in the target directory, i.e., objects of
2123 : * this database that are already in the target tablespace. We can't
2124 : * allow the move in such a case, because we would need to change those
2125 : * relations' pg_class.reltablespace entries to zero, and we don't have
2126 : * access to the DB's pg_class to do so.
2127 : */
2128 16 : dstdir = AllocateDir(dst_dbpath);
2129 16 : if (dstdir != NULL)
2130 : {
2131 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2132 : {
2133 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2134 0 : strcmp(xlde->d_name, "..") == 0)
2135 0 : continue;
2136 :
2137 0 : ereport(ERROR,
2138 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2139 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2140 : dbname, tblspcname),
2141 : errhint("You must move them back to the database's default tablespace before using this command.")));
2142 : }
2143 :
2144 0 : FreeDir(dstdir);
2145 :
2146 : /*
2147 : * The directory exists but is empty. We must remove it before using
2148 : * the copydir function.
2149 : */
2150 0 : if (rmdir(dst_dbpath) != 0)
2151 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2152 : dst_dbpath);
2153 : }
2154 :
2155 : /*
2156 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2157 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2158 : * of the possibility of failure during transaction commit, but it should
2159 : * handle most scenarios.
2160 : */
2161 16 : fparms.dest_dboid = db_id;
2162 16 : fparms.dest_tsoid = dst_tblspcoid;
2163 16 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2164 : PointerGetDatum(&fparms));
2165 : {
2166 16 : Datum new_record[Natts_pg_database] = {0};
2167 16 : bool new_record_nulls[Natts_pg_database] = {0};
2168 16 : bool new_record_repl[Natts_pg_database] = {0};
2169 :
2170 : /*
2171 : * Copy files from the old tablespace to the new one
2172 : */
2173 16 : copydir(src_dbpath, dst_dbpath, false);
2174 :
2175 : /*
2176 : * Record the filesystem change in XLOG
2177 : */
2178 : {
2179 : xl_dbase_create_file_copy_rec xlrec;
2180 :
2181 16 : xlrec.db_id = db_id;
2182 16 : xlrec.tablespace_id = dst_tblspcoid;
2183 16 : xlrec.src_db_id = db_id;
2184 16 : xlrec.src_tablespace_id = src_tblspcoid;
2185 :
2186 16 : XLogBeginInsert();
2187 16 : XLogRegisterData(&xlrec,
2188 : sizeof(xl_dbase_create_file_copy_rec));
2189 :
2190 16 : (void) XLogInsert(RM_DBASE_ID,
2191 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2192 : }
2193 :
2194 : /*
2195 : * Update the database's pg_database tuple
2196 : */
2197 16 : ScanKeyInit(&scankey,
2198 : Anum_pg_database_datname,
2199 : BTEqualStrategyNumber, F_NAMEEQ,
2200 : CStringGetDatum(dbname));
2201 16 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2202 : NULL, 1, &scankey);
2203 16 : oldtuple = systable_getnext(sysscan);
2204 16 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2205 0 : ereport(ERROR,
2206 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2207 : errmsg("database \"%s\" does not exist", dbname)));
2208 16 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2209 :
2210 16 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2211 16 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2212 :
2213 16 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2214 : new_record,
2215 : new_record_nulls, new_record_repl);
2216 16 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2217 16 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2218 :
2219 16 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2220 :
2221 16 : systable_endscan(sysscan);
2222 :
2223 : /*
2224 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2225 : * ensure that we don't have to replay a committed
2226 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2227 : * any unlogged operations done in the new DB tablespace before the
2228 : * next checkpoint.
2229 : */
2230 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2231 :
2232 : /*
2233 : * Force synchronous commit, thus minimizing the window between
2234 : * copying the database files and committal of the transaction. If we
2235 : * crash before committing, we'll leave an orphaned set of files on
2236 : * disk, which is not fatal but not good either.
2237 : */
2238 16 : ForceSyncCommit();
2239 :
2240 : /*
2241 : * Close pg_database, but keep lock till commit.
2242 : */
2243 16 : table_close(pgdbrel, NoLock);
2244 : }
2245 16 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2246 : PointerGetDatum(&fparms));
2247 :
2248 : /*
2249 : * Commit the transaction so that the pg_database update is committed. If
2250 : * we crash while removing files, the database won't be corrupt, we'll
2251 : * just leave some orphaned files in the old directory.
2252 : *
2253 : * (This is OK because we know we aren't inside a transaction block.)
2254 : *
2255 : * XXX would it be safe/better to do this inside the ensure block? Not
2256 : * convinced it's a good idea; consider elog just after the transaction
2257 : * really commits.
2258 : */
2259 16 : PopActiveSnapshot();
2260 16 : CommitTransactionCommand();
2261 :
2262 : /* Start new transaction for the remaining work; don't need a snapshot */
2263 16 : StartTransactionCommand();
2264 :
2265 : /*
2266 : * Remove files from the old tablespace
2267 : */
2268 16 : if (!rmtree(src_dbpath, true))
2269 0 : ereport(WARNING,
2270 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2271 : src_dbpath)));
2272 :
2273 : /*
2274 : * Record the filesystem change in XLOG
2275 : */
2276 : {
2277 : xl_dbase_drop_rec xlrec;
2278 :
2279 16 : xlrec.db_id = db_id;
2280 16 : xlrec.ntablespaces = 1;
2281 :
2282 16 : XLogBeginInsert();
2283 16 : XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2284 16 : XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2285 :
2286 16 : (void) XLogInsert(RM_DBASE_ID,
2287 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2288 : }
2289 :
2290 : /* Now it's safe to release the database lock */
2291 16 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2292 : AccessExclusiveLock);
2293 :
2294 16 : pfree(src_dbpath);
2295 16 : pfree(dst_dbpath);
2296 : }
2297 :
2298 : /* Error cleanup callback for movedb */
2299 : static void
2300 0 : movedb_failure_callback(int code, Datum arg)
2301 : {
2302 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2303 : char *dstpath;
2304 :
2305 : /* Get rid of anything we managed to copy to the target directory */
2306 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2307 :
2308 0 : (void) rmtree(dstpath, true);
2309 :
2310 0 : pfree(dstpath);
2311 0 : }
2312 :
2313 : /*
2314 : * Process options and call dropdb function.
2315 : */
2316 : void
2317 122 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2318 : {
2319 122 : bool force = false;
2320 : ListCell *lc;
2321 :
2322 148 : foreach(lc, stmt->options)
2323 : {
2324 26 : DefElem *opt = (DefElem *) lfirst(lc);
2325 :
2326 26 : if (strcmp(opt->defname, "force") == 0)
2327 26 : force = true;
2328 : else
2329 0 : ereport(ERROR,
2330 : (errcode(ERRCODE_SYNTAX_ERROR),
2331 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2332 : parser_errposition(pstate, opt->location)));
2333 : }
2334 :
2335 122 : dropdb(stmt->dbname, stmt->missing_ok, force);
2336 104 : }
2337 :
2338 : /*
2339 : * ALTER DATABASE name ...
2340 : */
2341 : Oid
2342 76 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2343 : {
2344 : Relation rel;
2345 : Oid dboid;
2346 : HeapTuple tuple,
2347 : newtuple;
2348 : Form_pg_database datform;
2349 : ScanKeyData scankey;
2350 : SysScanDesc scan;
2351 : ListCell *option;
2352 76 : bool dbistemplate = false;
2353 76 : bool dballowconnections = true;
2354 76 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2355 76 : DefElem *distemplate = NULL;
2356 76 : DefElem *dallowconnections = NULL;
2357 76 : DefElem *dconnlimit = NULL;
2358 76 : DefElem *dtablespace = NULL;
2359 76 : Datum new_record[Natts_pg_database] = {0};
2360 76 : bool new_record_nulls[Natts_pg_database] = {0};
2361 76 : bool new_record_repl[Natts_pg_database] = {0};
2362 :
2363 : /* Extract options from the statement node tree */
2364 152 : foreach(option, stmt->options)
2365 : {
2366 76 : DefElem *defel = (DefElem *) lfirst(option);
2367 :
2368 76 : if (strcmp(defel->defname, "is_template") == 0)
2369 : {
2370 20 : if (distemplate)
2371 0 : errorConflictingDefElem(defel, pstate);
2372 20 : distemplate = defel;
2373 : }
2374 56 : else if (strcmp(defel->defname, "allow_connections") == 0)
2375 : {
2376 32 : if (dallowconnections)
2377 0 : errorConflictingDefElem(defel, pstate);
2378 32 : dallowconnections = defel;
2379 : }
2380 24 : else if (strcmp(defel->defname, "connection_limit") == 0)
2381 : {
2382 8 : if (dconnlimit)
2383 0 : errorConflictingDefElem(defel, pstate);
2384 8 : dconnlimit = defel;
2385 : }
2386 16 : else if (strcmp(defel->defname, "tablespace") == 0)
2387 : {
2388 16 : if (dtablespace)
2389 0 : errorConflictingDefElem(defel, pstate);
2390 16 : dtablespace = defel;
2391 : }
2392 : else
2393 0 : ereport(ERROR,
2394 : (errcode(ERRCODE_SYNTAX_ERROR),
2395 : errmsg("option \"%s\" not recognized", defel->defname),
2396 : parser_errposition(pstate, defel->location)));
2397 : }
2398 :
2399 76 : if (dtablespace)
2400 : {
2401 : /*
2402 : * While the SET TABLESPACE syntax doesn't allow any other options,
2403 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2404 : * options from being specified in that case.
2405 : */
2406 16 : if (list_length(stmt->options) != 1)
2407 0 : ereport(ERROR,
2408 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2409 : errmsg("option \"%s\" cannot be specified with other options",
2410 : dtablespace->defname),
2411 : parser_errposition(pstate, dtablespace->location)));
2412 : /* this case isn't allowed within a transaction block */
2413 16 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2414 16 : movedb(stmt->dbname, defGetString(dtablespace));
2415 16 : return InvalidOid;
2416 : }
2417 :
2418 60 : if (distemplate && distemplate->arg)
2419 20 : dbistemplate = defGetBoolean(distemplate);
2420 60 : if (dallowconnections && dallowconnections->arg)
2421 32 : dballowconnections = defGetBoolean(dallowconnections);
2422 60 : if (dconnlimit && dconnlimit->arg)
2423 : {
2424 8 : dbconnlimit = defGetInt32(dconnlimit);
2425 8 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2426 0 : ereport(ERROR,
2427 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2428 : errmsg("invalid connection limit: %d", dbconnlimit)));
2429 : }
2430 :
2431 : /*
2432 : * Get the old tuple. We don't need a lock on the database per se,
2433 : * because we're not going to do anything that would mess up incoming
2434 : * connections.
2435 : */
2436 60 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2437 60 : ScanKeyInit(&scankey,
2438 : Anum_pg_database_datname,
2439 : BTEqualStrategyNumber, F_NAMEEQ,
2440 60 : CStringGetDatum(stmt->dbname));
2441 60 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2442 : NULL, 1, &scankey);
2443 60 : tuple = systable_getnext(scan);
2444 60 : if (!HeapTupleIsValid(tuple))
2445 0 : ereport(ERROR,
2446 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2447 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2448 60 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2449 :
2450 60 : datform = (Form_pg_database) GETSTRUCT(tuple);
2451 60 : dboid = datform->oid;
2452 :
2453 60 : if (database_is_invalid_form(datform))
2454 : {
2455 2 : ereport(FATAL,
2456 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2457 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2458 : errhint("Use DROP DATABASE to drop invalid databases."));
2459 : }
2460 :
2461 58 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2462 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2463 0 : stmt->dbname);
2464 :
2465 : /*
2466 : * In order to avoid getting locked out and having to go through
2467 : * standalone mode, we refuse to disallow connections to the database
2468 : * we're currently connected to. Lockout can still happen with concurrent
2469 : * sessions but the likeliness of that is not high enough to worry about.
2470 : */
2471 58 : if (!dballowconnections && dboid == MyDatabaseId)
2472 0 : ereport(ERROR,
2473 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2474 : errmsg("cannot disallow connections for current database")));
2475 :
2476 : /*
2477 : * Build an updated tuple, perusing the information just obtained
2478 : */
2479 58 : if (distemplate)
2480 : {
2481 20 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2482 20 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2483 : }
2484 58 : if (dallowconnections)
2485 : {
2486 32 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2487 32 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2488 : }
2489 58 : if (dconnlimit)
2490 : {
2491 6 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2492 6 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2493 : }
2494 :
2495 58 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2496 : new_record_nulls, new_record_repl);
2497 58 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2498 58 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2499 :
2500 58 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2501 :
2502 58 : systable_endscan(scan);
2503 :
2504 : /* Close pg_database, but keep lock till commit */
2505 58 : table_close(rel, NoLock);
2506 :
2507 58 : return dboid;
2508 : }
2509 :
2510 :
2511 : /*
2512 : * ALTER DATABASE name REFRESH COLLATION VERSION
2513 : */
2514 : ObjectAddress
2515 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2516 : {
2517 : Relation rel;
2518 : ScanKeyData scankey;
2519 : SysScanDesc scan;
2520 : Oid db_id;
2521 : HeapTuple tuple;
2522 : Form_pg_database datForm;
2523 : ObjectAddress address;
2524 : Datum datum;
2525 : bool isnull;
2526 : char *oldversion;
2527 : char *newversion;
2528 :
2529 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2530 6 : ScanKeyInit(&scankey,
2531 : Anum_pg_database_datname,
2532 : BTEqualStrategyNumber, F_NAMEEQ,
2533 6 : CStringGetDatum(stmt->dbname));
2534 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2535 : NULL, 1, &scankey);
2536 6 : tuple = systable_getnext(scan);
2537 6 : if (!HeapTupleIsValid(tuple))
2538 0 : ereport(ERROR,
2539 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2540 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2541 :
2542 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2543 6 : db_id = datForm->oid;
2544 :
2545 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2546 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2547 0 : stmt->dbname);
2548 6 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2549 :
2550 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2551 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2552 :
2553 6 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2554 : {
2555 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2556 4 : if (isnull)
2557 0 : elog(ERROR, "unexpected null in pg_database");
2558 : }
2559 : else
2560 : {
2561 2 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2562 2 : if (isnull)
2563 0 : elog(ERROR, "unexpected null in pg_database");
2564 : }
2565 :
2566 6 : newversion = get_collation_actual_version(datForm->datlocprovider,
2567 6 : TextDatumGetCString(datum));
2568 :
2569 : /* cannot change from NULL to non-NULL or vice versa */
2570 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
2571 0 : elog(ERROR, "invalid collation version change");
2572 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2573 0 : {
2574 0 : bool nulls[Natts_pg_database] = {0};
2575 0 : bool replaces[Natts_pg_database] = {0};
2576 0 : Datum values[Natts_pg_database] = {0};
2577 : HeapTuple newtuple;
2578 :
2579 0 : ereport(NOTICE,
2580 : (errmsg("changing version from %s to %s",
2581 : oldversion, newversion)));
2582 :
2583 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2584 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2585 :
2586 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2587 : values, nulls, replaces);
2588 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2589 0 : heap_freetuple(newtuple);
2590 : }
2591 : else
2592 6 : ereport(NOTICE,
2593 : (errmsg("version has not changed")));
2594 6 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2595 :
2596 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2597 :
2598 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2599 :
2600 6 : systable_endscan(scan);
2601 :
2602 6 : table_close(rel, NoLock);
2603 :
2604 6 : return address;
2605 : }
2606 :
2607 :
2608 : /*
2609 : * ALTER DATABASE name SET ...
2610 : */
2611 : Oid
2612 1162 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2613 : {
2614 1162 : Oid datid = get_database_oid(stmt->dbname, false);
2615 :
2616 : /*
2617 : * Obtain a lock on the database and make sure it didn't go away in the
2618 : * meantime.
2619 : */
2620 1162 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2621 :
2622 1162 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2623 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2624 0 : stmt->dbname);
2625 :
2626 1162 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2627 :
2628 1162 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2629 :
2630 1162 : return datid;
2631 : }
2632 :
2633 :
2634 : /*
2635 : * ALTER DATABASE name OWNER TO newowner
2636 : */
2637 : ObjectAddress
2638 78 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2639 : {
2640 : Oid db_id;
2641 : HeapTuple tuple;
2642 : Relation rel;
2643 : ScanKeyData scankey;
2644 : SysScanDesc scan;
2645 : Form_pg_database datForm;
2646 : ObjectAddress address;
2647 :
2648 : /*
2649 : * Get the old tuple. We don't need a lock on the database per se,
2650 : * because we're not going to do anything that would mess up incoming
2651 : * connections.
2652 : */
2653 78 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2654 78 : ScanKeyInit(&scankey,
2655 : Anum_pg_database_datname,
2656 : BTEqualStrategyNumber, F_NAMEEQ,
2657 : CStringGetDatum(dbname));
2658 78 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2659 : NULL, 1, &scankey);
2660 78 : tuple = systable_getnext(scan);
2661 78 : if (!HeapTupleIsValid(tuple))
2662 0 : ereport(ERROR,
2663 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2664 : errmsg("database \"%s\" does not exist", dbname)));
2665 :
2666 78 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2667 78 : db_id = datForm->oid;
2668 :
2669 : /*
2670 : * If the new owner is the same as the existing owner, consider the
2671 : * command to have succeeded. This is to be consistent with other
2672 : * objects.
2673 : */
2674 78 : if (datForm->datdba != newOwnerId)
2675 : {
2676 : Datum repl_val[Natts_pg_database];
2677 30 : bool repl_null[Natts_pg_database] = {0};
2678 30 : bool repl_repl[Natts_pg_database] = {0};
2679 : Acl *newAcl;
2680 : Datum aclDatum;
2681 : bool isNull;
2682 : HeapTuple newtuple;
2683 :
2684 : /* Otherwise, must be owner of the existing object */
2685 30 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2686 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2687 : dbname);
2688 :
2689 : /* Must be able to become new owner */
2690 30 : check_can_set_role(GetUserId(), newOwnerId);
2691 :
2692 : /*
2693 : * must have createdb rights
2694 : *
2695 : * NOTE: This is different from other alter-owner checks in that the
2696 : * current user is checked for createdb privileges instead of the
2697 : * destination owner. This is consistent with the CREATE case for
2698 : * databases. Because superusers will always have this right, we need
2699 : * no special case for them.
2700 : */
2701 30 : if (!have_createdb_privilege())
2702 0 : ereport(ERROR,
2703 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2704 : errmsg("permission denied to change owner of database")));
2705 :
2706 30 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2707 :
2708 30 : repl_repl[Anum_pg_database_datdba - 1] = true;
2709 30 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2710 :
2711 : /*
2712 : * Determine the modified ACL for the new owner. This is only
2713 : * necessary when the ACL is non-null.
2714 : */
2715 30 : aclDatum = heap_getattr(tuple,
2716 : Anum_pg_database_datacl,
2717 : RelationGetDescr(rel),
2718 : &isNull);
2719 30 : if (!isNull)
2720 : {
2721 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2722 : datForm->datdba, newOwnerId);
2723 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2724 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2725 : }
2726 :
2727 30 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2728 30 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2729 30 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2730 :
2731 30 : heap_freetuple(newtuple);
2732 :
2733 : /* Update owner dependency reference */
2734 30 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2735 : }
2736 :
2737 78 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2738 :
2739 78 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2740 :
2741 78 : systable_endscan(scan);
2742 :
2743 : /* Close pg_database, but keep lock till commit */
2744 78 : table_close(rel, NoLock);
2745 :
2746 78 : return address;
2747 : }
2748 :
2749 :
2750 : Datum
2751 86 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2752 : {
2753 86 : Oid dbid = PG_GETARG_OID(0);
2754 : HeapTuple tp;
2755 : char datlocprovider;
2756 : Datum datum;
2757 : char *version;
2758 :
2759 86 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2760 86 : if (!HeapTupleIsValid(tp))
2761 0 : ereport(ERROR,
2762 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2763 : errmsg("database with OID %u does not exist", dbid)));
2764 :
2765 86 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2766 :
2767 86 : if (datlocprovider == COLLPROVIDER_LIBC)
2768 72 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2769 : else
2770 14 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2771 :
2772 86 : version = get_collation_actual_version(datlocprovider,
2773 86 : TextDatumGetCString(datum));
2774 :
2775 86 : ReleaseSysCache(tp);
2776 :
2777 86 : if (version)
2778 58 : PG_RETURN_TEXT_P(cstring_to_text(version));
2779 : else
2780 28 : PG_RETURN_NULL();
2781 : }
2782 :
2783 :
2784 : /*
2785 : * Helper functions
2786 : */
2787 :
2788 : /*
2789 : * Look up info about the database named "name". If the database exists,
2790 : * obtain the specified lock type on it, fill in any of the remaining
2791 : * parameters that aren't NULL, and return true. If no such database,
2792 : * return false.
2793 : */
2794 : static bool
2795 872 : get_db_info(const char *name, LOCKMODE lockmode,
2796 : Oid *dbIdP, Oid *ownerIdP,
2797 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2798 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2799 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2800 : char **dbIcurules,
2801 : char *dbLocProvider,
2802 : char **dbCollversion)
2803 : {
2804 872 : bool result = false;
2805 : Relation relation;
2806 :
2807 : Assert(name);
2808 :
2809 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2810 872 : relation = table_open(DatabaseRelationId, AccessShareLock);
2811 :
2812 : /*
2813 : * Loop covers the rare case where the database is renamed before we can
2814 : * lock it. We try again just in case we can find a new one of the same
2815 : * name.
2816 : */
2817 : for (;;)
2818 0 : {
2819 : ScanKeyData scanKey;
2820 : SysScanDesc scan;
2821 : HeapTuple tuple;
2822 : Oid dbOid;
2823 :
2824 : /*
2825 : * there's no syscache for database-indexed-by-name, so must do it the
2826 : * hard way
2827 : */
2828 872 : ScanKeyInit(&scanKey,
2829 : Anum_pg_database_datname,
2830 : BTEqualStrategyNumber, F_NAMEEQ,
2831 : CStringGetDatum(name));
2832 :
2833 872 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2834 : NULL, 1, &scanKey);
2835 :
2836 872 : tuple = systable_getnext(scan);
2837 :
2838 872 : if (!HeapTupleIsValid(tuple))
2839 : {
2840 : /* definitely no database of that name */
2841 32 : systable_endscan(scan);
2842 32 : break;
2843 : }
2844 :
2845 840 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2846 :
2847 840 : systable_endscan(scan);
2848 :
2849 : /*
2850 : * Now that we have a database OID, we can try to lock the DB.
2851 : */
2852 840 : if (lockmode != NoLock)
2853 840 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2854 :
2855 : /*
2856 : * And now, re-fetch the tuple by OID. If it's still there and still
2857 : * the same name, we win; else, drop the lock and loop back to try
2858 : * again.
2859 : */
2860 840 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2861 840 : if (HeapTupleIsValid(tuple))
2862 : {
2863 840 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2864 :
2865 840 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2866 : {
2867 : Datum datum;
2868 : bool isnull;
2869 :
2870 : /* oid of the database */
2871 840 : if (dbIdP)
2872 840 : *dbIdP = dbOid;
2873 : /* oid of the owner */
2874 840 : if (ownerIdP)
2875 728 : *ownerIdP = dbform->datdba;
2876 : /* character encoding */
2877 840 : if (encodingP)
2878 728 : *encodingP = dbform->encoding;
2879 : /* allowed as template? */
2880 840 : if (dbIsTemplateP)
2881 818 : *dbIsTemplateP = dbform->datistemplate;
2882 : /* Has on login event trigger? */
2883 840 : if (dbHasLoginEvtP)
2884 728 : *dbHasLoginEvtP = dbform->dathasloginevt;
2885 : /* allowing connections? */
2886 840 : if (dbAllowConnP)
2887 728 : *dbAllowConnP = dbform->datallowconn;
2888 : /* limit of frozen XIDs */
2889 840 : if (dbFrozenXidP)
2890 728 : *dbFrozenXidP = dbform->datfrozenxid;
2891 : /* minimum MultiXactId */
2892 840 : if (dbMinMultiP)
2893 728 : *dbMinMultiP = dbform->datminmxid;
2894 : /* default tablespace for this database */
2895 840 : if (dbTablespace)
2896 744 : *dbTablespace = dbform->dattablespace;
2897 : /* default locale settings for this database */
2898 840 : if (dbLocProvider)
2899 728 : *dbLocProvider = dbform->datlocprovider;
2900 840 : if (dbCollate)
2901 : {
2902 728 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2903 728 : *dbCollate = TextDatumGetCString(datum);
2904 : }
2905 840 : if (dbCtype)
2906 : {
2907 728 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2908 728 : *dbCtype = TextDatumGetCString(datum);
2909 : }
2910 840 : if (dbLocale)
2911 : {
2912 728 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2913 728 : if (isnull)
2914 634 : *dbLocale = NULL;
2915 : else
2916 94 : *dbLocale = TextDatumGetCString(datum);
2917 : }
2918 840 : if (dbIcurules)
2919 : {
2920 728 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2921 728 : if (isnull)
2922 728 : *dbIcurules = NULL;
2923 : else
2924 0 : *dbIcurules = TextDatumGetCString(datum);
2925 : }
2926 840 : if (dbCollversion)
2927 : {
2928 728 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2929 728 : if (isnull)
2930 450 : *dbCollversion = NULL;
2931 : else
2932 278 : *dbCollversion = TextDatumGetCString(datum);
2933 : }
2934 840 : ReleaseSysCache(tuple);
2935 840 : result = true;
2936 840 : break;
2937 : }
2938 : /* can only get here if it was just renamed */
2939 0 : ReleaseSysCache(tuple);
2940 : }
2941 :
2942 0 : if (lockmode != NoLock)
2943 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2944 : }
2945 :
2946 872 : table_close(relation, AccessShareLock);
2947 :
2948 872 : return result;
2949 : }
2950 :
2951 : /* Check if current user has createdb privileges */
2952 : bool
2953 800 : have_createdb_privilege(void)
2954 : {
2955 800 : bool result = false;
2956 : HeapTuple utup;
2957 :
2958 : /* Superusers can always do everything */
2959 800 : if (superuser())
2960 764 : return true;
2961 :
2962 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2963 36 : if (HeapTupleIsValid(utup))
2964 : {
2965 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2966 36 : ReleaseSysCache(utup);
2967 : }
2968 36 : return result;
2969 : }
2970 :
2971 : /*
2972 : * Remove tablespace directories
2973 : *
2974 : * We don't know what tablespaces db_id is using, so iterate through all
2975 : * tablespaces removing <tablespace>/db_id
2976 : */
2977 : static void
2978 88 : remove_dbtablespaces(Oid db_id)
2979 : {
2980 : Relation rel;
2981 : TableScanDesc scan;
2982 : HeapTuple tuple;
2983 88 : List *ltblspc = NIL;
2984 : ListCell *cell;
2985 : int ntblspc;
2986 : int i;
2987 : Oid *tablespace_ids;
2988 :
2989 88 : rel = table_open(TableSpaceRelationId, AccessShareLock);
2990 88 : scan = table_beginscan_catalog(rel, 0, NULL);
2991 314 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2992 : {
2993 226 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2994 226 : Oid dsttablespace = spcform->oid;
2995 : char *dstpath;
2996 : struct stat st;
2997 :
2998 : /* Don't mess with the global tablespace */
2999 226 : if (dsttablespace == GLOBALTABLESPACE_OID)
3000 138 : continue;
3001 :
3002 138 : dstpath = GetDatabasePath(db_id, dsttablespace);
3003 :
3004 138 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3005 : {
3006 : /* Assume we can ignore it */
3007 50 : pfree(dstpath);
3008 50 : continue;
3009 : }
3010 :
3011 88 : if (!rmtree(dstpath, true))
3012 0 : ereport(WARNING,
3013 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3014 : dstpath)));
3015 :
3016 88 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3017 88 : pfree(dstpath);
3018 : }
3019 :
3020 88 : ntblspc = list_length(ltblspc);
3021 88 : if (ntblspc == 0)
3022 : {
3023 0 : table_endscan(scan);
3024 0 : table_close(rel, AccessShareLock);
3025 0 : return;
3026 : }
3027 :
3028 88 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3029 88 : i = 0;
3030 176 : foreach(cell, ltblspc)
3031 88 : tablespace_ids[i++] = lfirst_oid(cell);
3032 :
3033 : /* Record the filesystem change in XLOG */
3034 : {
3035 : xl_dbase_drop_rec xlrec;
3036 :
3037 88 : xlrec.db_id = db_id;
3038 88 : xlrec.ntablespaces = ntblspc;
3039 :
3040 88 : XLogBeginInsert();
3041 88 : XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3042 88 : XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3043 :
3044 88 : (void) XLogInsert(RM_DBASE_ID,
3045 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3046 : }
3047 :
3048 88 : list_free(ltblspc);
3049 88 : pfree(tablespace_ids);
3050 :
3051 88 : table_endscan(scan);
3052 88 : table_close(rel, AccessShareLock);
3053 : }
3054 :
3055 : /*
3056 : * Check for existing files that conflict with a proposed new DB OID;
3057 : * return true if there are any
3058 : *
3059 : * If there were a subdirectory in any tablespace matching the proposed new
3060 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3061 : * try to remove that already-existing subdirectory during the cleanup in
3062 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3063 : * instead we make this extra check before settling on the OID of the new
3064 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3065 : * relfilenumber values.
3066 : */
3067 : static bool
3068 702 : check_db_file_conflict(Oid db_id)
3069 : {
3070 702 : bool result = false;
3071 : Relation rel;
3072 : TableScanDesc scan;
3073 : HeapTuple tuple;
3074 :
3075 702 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3076 702 : scan = table_beginscan_catalog(rel, 0, NULL);
3077 2204 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3078 : {
3079 1502 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3080 1502 : Oid dsttablespace = spcform->oid;
3081 : char *dstpath;
3082 : struct stat st;
3083 :
3084 : /* Don't mess with the global tablespace */
3085 1502 : if (dsttablespace == GLOBALTABLESPACE_OID)
3086 702 : continue;
3087 :
3088 800 : dstpath = GetDatabasePath(db_id, dsttablespace);
3089 :
3090 800 : if (lstat(dstpath, &st) == 0)
3091 : {
3092 : /* Found a conflicting file (or directory, whatever) */
3093 0 : pfree(dstpath);
3094 0 : result = true;
3095 0 : break;
3096 : }
3097 :
3098 800 : pfree(dstpath);
3099 : }
3100 :
3101 702 : table_endscan(scan);
3102 702 : table_close(rel, AccessShareLock);
3103 :
3104 702 : return result;
3105 : }
3106 :
3107 : /*
3108 : * Issue a suitable errdetail message for a busy database
3109 : */
3110 : static int
3111 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3112 : {
3113 0 : if (notherbackends > 0 && npreparedxacts > 0)
3114 :
3115 : /*
3116 : * We don't deal with singular versus plural here, since gettext
3117 : * doesn't support multiple plurals in one string.
3118 : */
3119 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3120 : notherbackends, npreparedxacts);
3121 0 : else if (notherbackends > 0)
3122 0 : errdetail_plural("There is %d other session using the database.",
3123 : "There are %d other sessions using the database.",
3124 : notherbackends,
3125 : notherbackends);
3126 : else
3127 0 : errdetail_plural("There is %d prepared transaction using the database.",
3128 : "There are %d prepared transactions using the database.",
3129 : npreparedxacts,
3130 : npreparedxacts);
3131 0 : return 0; /* just to keep ereport macro happy */
3132 : }
3133 :
3134 : /*
3135 : * get_database_oid - given a database name, look up the OID
3136 : *
3137 : * If missing_ok is false, throw an error if database name not found. If
3138 : * true, just return InvalidOid.
3139 : */
3140 : Oid
3141 2706 : get_database_oid(const char *dbname, bool missing_ok)
3142 : {
3143 : Relation pg_database;
3144 : ScanKeyData entry[1];
3145 : SysScanDesc scan;
3146 : HeapTuple dbtuple;
3147 : Oid oid;
3148 :
3149 : /*
3150 : * There's no syscache for pg_database indexed by name, so we must look
3151 : * the hard way.
3152 : */
3153 2706 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3154 2706 : ScanKeyInit(&entry[0],
3155 : Anum_pg_database_datname,
3156 : BTEqualStrategyNumber, F_NAMEEQ,
3157 : CStringGetDatum(dbname));
3158 2706 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3159 : NULL, 1, entry);
3160 :
3161 2706 : dbtuple = systable_getnext(scan);
3162 :
3163 : /* We assume that there can be at most one matching tuple */
3164 2706 : if (HeapTupleIsValid(dbtuple))
3165 1992 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3166 : else
3167 714 : oid = InvalidOid;
3168 :
3169 2706 : systable_endscan(scan);
3170 2706 : table_close(pg_database, AccessShareLock);
3171 :
3172 2706 : if (!OidIsValid(oid) && !missing_ok)
3173 6 : ereport(ERROR,
3174 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3175 : errmsg("database \"%s\" does not exist",
3176 : dbname)));
3177 :
3178 2700 : return oid;
3179 : }
3180 :
3181 :
3182 : /*
3183 : * get_database_name - given a database OID, look up the name
3184 : *
3185 : * Returns a palloc'd string, or NULL if no such database.
3186 : */
3187 : char *
3188 352152 : get_database_name(Oid dbid)
3189 : {
3190 : HeapTuple dbtuple;
3191 : char *result;
3192 :
3193 352152 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3194 352152 : if (HeapTupleIsValid(dbtuple))
3195 : {
3196 351904 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3197 351904 : ReleaseSysCache(dbtuple);
3198 : }
3199 : else
3200 248 : result = NULL;
3201 :
3202 352152 : return result;
3203 : }
3204 :
3205 :
3206 : /*
3207 : * While dropping a database the pg_database row is marked invalid, but the
3208 : * catalog contents still exist. Connections to such a database are not
3209 : * allowed.
3210 : */
3211 : bool
3212 46202 : database_is_invalid_form(Form_pg_database datform)
3213 : {
3214 46202 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3215 : }
3216 :
3217 :
3218 : /*
3219 : * Convenience wrapper around database_is_invalid_form()
3220 : */
3221 : bool
3222 728 : database_is_invalid_oid(Oid dboid)
3223 : {
3224 : HeapTuple dbtup;
3225 : Form_pg_database dbform;
3226 : bool invalid;
3227 :
3228 728 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3229 728 : if (!HeapTupleIsValid(dbtup))
3230 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3231 728 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3232 :
3233 728 : invalid = database_is_invalid_form(dbform);
3234 :
3235 728 : ReleaseSysCache(dbtup);
3236 :
3237 728 : return invalid;
3238 : }
3239 :
3240 :
3241 : /*
3242 : * recovery_create_dbdir()
3243 : *
3244 : * During recovery, there's a case where we validly need to recover a missing
3245 : * tablespace directory so that recovery can continue. This happens when
3246 : * recovery wants to create a database but the holding tablespace has been
3247 : * removed before the server stopped. Since we expect that the directory will
3248 : * be gone before reaching recovery consistency, and we have no knowledge about
3249 : * the tablespace other than its OID here, we create a real directory under
3250 : * pg_tblspc here instead of restoring the symlink.
3251 : *
3252 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3253 : */
3254 : static void
3255 44 : recovery_create_dbdir(char *path, bool only_tblspc)
3256 : {
3257 : struct stat st;
3258 :
3259 : Assert(RecoveryInProgress());
3260 :
3261 44 : if (stat(path, &st) == 0)
3262 44 : return;
3263 :
3264 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3265 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3266 :
3267 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3268 0 : ereport(PANIC,
3269 : errmsg("missing directory \"%s\"", path));
3270 :
3271 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3272 : "creating missing directory: %s", path);
3273 :
3274 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3275 0 : ereport(PANIC,
3276 : errmsg("could not create missing directory \"%s\": %m", path));
3277 : }
3278 :
3279 :
3280 : /*
3281 : * DATABASE resource manager's routines
3282 : */
3283 : void
3284 78 : dbase_redo(XLogReaderState *record)
3285 : {
3286 78 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3287 :
3288 : /* Backup blocks are not used in dbase records */
3289 : Assert(!XLogRecHasAnyBlockRefs(record));
3290 :
3291 78 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3292 : {
3293 8 : xl_dbase_create_file_copy_rec *xlrec =
3294 8 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3295 : char *src_path;
3296 : char *dst_path;
3297 : char *parent_path;
3298 : struct stat st;
3299 :
3300 8 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3301 8 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3302 :
3303 : /*
3304 : * Our theory for replaying a CREATE is to forcibly drop the target
3305 : * subdirectory if present, then re-copy the source data. This may be
3306 : * more work than needed, but it is simple to implement.
3307 : */
3308 8 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3309 : {
3310 0 : if (!rmtree(dst_path, true))
3311 : /* If this failed, copydir() below is going to error. */
3312 0 : ereport(WARNING,
3313 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3314 : dst_path)));
3315 : }
3316 :
3317 : /*
3318 : * If the parent of the target path doesn't exist, create it now. This
3319 : * enables us to create the target underneath later.
3320 : */
3321 8 : parent_path = pstrdup(dst_path);
3322 8 : get_parent_directory(parent_path);
3323 8 : if (stat(parent_path, &st) < 0)
3324 : {
3325 0 : if (errno != ENOENT)
3326 0 : ereport(FATAL,
3327 : errmsg("could not stat directory \"%s\": %m",
3328 : dst_path));
3329 :
3330 : /* create the parent directory if needed and valid */
3331 0 : recovery_create_dbdir(parent_path, true);
3332 : }
3333 8 : pfree(parent_path);
3334 :
3335 : /*
3336 : * There's a case where the copy source directory is missing for the
3337 : * same reason above. Create the empty source directory so that
3338 : * copydir below doesn't fail. The directory will be dropped soon by
3339 : * recovery.
3340 : */
3341 8 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3342 0 : recovery_create_dbdir(src_path, false);
3343 :
3344 : /*
3345 : * Force dirty buffers out to disk, to ensure source database is
3346 : * up-to-date for the copy.
3347 : */
3348 8 : FlushDatabaseBuffers(xlrec->src_db_id);
3349 :
3350 : /* Close all smgr fds in all backends. */
3351 8 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3352 :
3353 : /*
3354 : * Copy this subdirectory to the new location
3355 : *
3356 : * We don't need to copy subdirectories
3357 : */
3358 8 : copydir(src_path, dst_path, false);
3359 :
3360 8 : pfree(src_path);
3361 8 : pfree(dst_path);
3362 : }
3363 70 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3364 : {
3365 44 : xl_dbase_create_wal_log_rec *xlrec =
3366 44 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3367 : char *dbpath;
3368 : char *parent_path;
3369 :
3370 44 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3371 :
3372 : /* create the parent directory if needed and valid */
3373 44 : parent_path = pstrdup(dbpath);
3374 44 : get_parent_directory(parent_path);
3375 44 : recovery_create_dbdir(parent_path, true);
3376 :
3377 : /* Create the database directory with the version file. */
3378 44 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3379 : true);
3380 44 : pfree(dbpath);
3381 : }
3382 26 : else if (info == XLOG_DBASE_DROP)
3383 : {
3384 26 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3385 : char *dst_path;
3386 : int i;
3387 :
3388 26 : if (InHotStandby)
3389 : {
3390 : /*
3391 : * Lock database while we resolve conflicts to ensure that
3392 : * InitPostgres() cannot fully re-execute concurrently. This
3393 : * avoids backends re-connecting automatically to same database,
3394 : * which can happen in some cases.
3395 : *
3396 : * This will lock out walsenders trying to connect to db-specific
3397 : * slots for logical decoding too, so it's safe for us to drop
3398 : * slots.
3399 : */
3400 26 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3401 26 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3402 : }
3403 :
3404 : /* Drop any database-specific replication slots */
3405 26 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3406 :
3407 : /* Drop pages for this database that are in the shared buffer cache */
3408 26 : DropDatabaseBuffers(xlrec->db_id);
3409 :
3410 : /* Also, clean out any fsync requests that might be pending in md.c */
3411 26 : ForgetDatabaseSyncRequests(xlrec->db_id);
3412 :
3413 : /* Clean out the xlog relcache too */
3414 26 : XLogDropDatabase(xlrec->db_id);
3415 :
3416 : /* Close all smgr fds in all backends. */
3417 26 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3418 :
3419 52 : for (i = 0; i < xlrec->ntablespaces; i++)
3420 : {
3421 26 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3422 :
3423 : /* And remove the physical files */
3424 26 : if (!rmtree(dst_path, true))
3425 0 : ereport(WARNING,
3426 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3427 : dst_path)));
3428 26 : pfree(dst_path);
3429 : }
3430 :
3431 26 : if (InHotStandby)
3432 : {
3433 : /*
3434 : * Release locks prior to commit. XXX There is a race condition
3435 : * here that may allow backends to reconnect, but the window for
3436 : * this is small because the gap between here and commit is mostly
3437 : * fairly small and it is unlikely that people will be dropping
3438 : * databases that we are trying to connect to anyway.
3439 : */
3440 26 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3441 : }
3442 : }
3443 : else
3444 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3445 78 : }
|