Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/pg_locale.h"
68 : #include "utils/relmapper.h"
69 : #include "utils/snapmgr.h"
70 : #include "utils/syscache.h"
71 :
72 : /*
73 : * Create database strategy.
74 : *
75 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76 : * copied block.
77 : *
78 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79 : * database and log a single record for each tablespace copied. To make this
80 : * safe, it also triggers checkpoints before and after the operation.
81 : */
82 : typedef enum CreateDBStrategy
83 : {
84 : CREATEDB_WAL_LOG,
85 : CREATEDB_FILE_COPY,
86 : } CreateDBStrategy;
87 :
88 : typedef struct
89 : {
90 : Oid src_dboid; /* source (template) DB */
91 : Oid dest_dboid; /* DB we are trying to create */
92 : CreateDBStrategy strategy; /* create db strategy */
93 : } createdb_failure_params;
94 :
95 : typedef struct
96 : {
97 : Oid dest_dboid; /* DB we are trying to move */
98 : Oid dest_tsoid; /* tablespace we are trying to move to */
99 : } movedb_failure_params;
100 :
101 : /*
102 : * Information about a relation to be copied when creating a database.
103 : */
104 : typedef struct CreateDBRelInfo
105 : {
106 : RelFileLocator rlocator; /* physical relation identifier */
107 : Oid reloid; /* relation oid */
108 : bool permanent; /* relation is permanent or unlogged */
109 : } CreateDBRelInfo;
110 :
111 :
112 : /* non-export function prototypes */
113 : static void createdb_failure_callback(int code, Datum arg);
114 : static void movedb(const char *dbname, const char *tblspcname);
115 : static void movedb_failure_callback(int code, Datum arg);
116 : static bool get_db_info(const char *name, LOCKMODE lockmode,
117 : Oid *dbIdP, Oid *ownerIdP,
118 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121 : char **dbIcurules,
122 : char *dbLocProvider,
123 : char **dbCollversion);
124 : static void remove_dbtablespaces(Oid db_id);
125 : static bool check_db_file_conflict(Oid db_id);
126 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
127 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128 : Oid dst_tsid);
129 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
130 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
131 : Oid dbid, char *srcpath,
132 : List *rlocatorlist, Snapshot snapshot);
133 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
134 : Oid tbid, Oid dbid,
135 : char *srcpath);
136 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137 : bool isRedo);
138 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139 : Oid src_tsid, Oid dst_tsid);
140 : static void recovery_create_dbdir(char *path, bool only_tblspc);
141 :
142 : /*
143 : * Create a new database using the WAL_LOG strategy.
144 : *
145 : * Each copied block is separately written to the write-ahead log.
146 : */
147 : static void
148 432 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149 : Oid src_tsid, Oid dst_tsid)
150 : {
151 : char *srcpath;
152 : char *dstpath;
153 432 : List *rlocatorlist = NULL;
154 : ListCell *cell;
155 : LockRelId srcrelid;
156 : LockRelId dstrelid;
157 : RelFileLocator srcrlocator;
158 : RelFileLocator dstrlocator;
159 : CreateDBRelInfo *relinfo;
160 :
161 : /* Get source and destination database paths. */
162 432 : srcpath = GetDatabasePath(src_dboid, src_tsid);
163 432 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164 :
165 : /* Create database directory and write PG_VERSION file. */
166 432 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167 :
168 : /* Copy relmap file from source database to the destination database. */
169 432 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170 :
171 : /* Get list of relfilelocators to copy from the source database. */
172 432 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173 : Assert(rlocatorlist != NIL);
174 :
175 : /*
176 : * Database IDs will be the same for all relations so set them before
177 : * entering the loop.
178 : */
179 432 : srcrelid.dbId = src_dboid;
180 432 : dstrelid.dbId = dst_dboid;
181 :
182 : /* Loop over our list of relfilelocators and copy each one. */
183 97232 : foreach(cell, rlocatorlist)
184 : {
185 96800 : relinfo = lfirst(cell);
186 96800 : srcrlocator = relinfo->rlocator;
187 :
188 : /*
189 : * If the relation is from the source db's default tablespace then we
190 : * need to create it in the destination db's default tablespace.
191 : * Otherwise, we need to create in the same tablespace as it is in the
192 : * source database.
193 : */
194 96800 : if (srcrlocator.spcOid == src_tsid)
195 96800 : dstrlocator.spcOid = dst_tsid;
196 : else
197 0 : dstrlocator.spcOid = srcrlocator.spcOid;
198 :
199 96800 : dstrlocator.dbOid = dst_dboid;
200 96800 : dstrlocator.relNumber = srcrlocator.relNumber;
201 :
202 : /*
203 : * Acquire locks on source and target relations before copying.
204 : *
205 : * We typically do not read relation data into shared_buffers without
206 : * holding a relation lock. It's unclear what could go wrong if we
207 : * skipped it in this case, because nobody can be modifying either the
208 : * source or destination database at this point, and we have locks on
209 : * both databases, too, but let's take the conservative route.
210 : */
211 96800 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
212 96800 : LockRelationId(&srcrelid, AccessShareLock);
213 96800 : LockRelationId(&dstrelid, AccessShareLock);
214 :
215 : /* Copy relation storage from source to the destination. */
216 96800 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217 :
218 : /* Release the relation locks. */
219 96800 : UnlockRelationId(&srcrelid, AccessShareLock);
220 96800 : UnlockRelationId(&dstrelid, AccessShareLock);
221 : }
222 :
223 432 : pfree(srcpath);
224 432 : pfree(dstpath);
225 432 : list_free_deep(rlocatorlist);
226 432 : }
227 :
228 : /*
229 : * Scan the pg_class table in the source database to identify the relations
230 : * that need to be copied to the destination database.
231 : *
232 : * This is an exception to the usual rule that cross-database access is
233 : * not possible. We can make it work here because we know that there are no
234 : * connections to the source database and (since there can't be prepared
235 : * transactions touching that database) no in-doubt tuples either. This
236 : * means that we don't need to worry about pruning removing anything from
237 : * under us, and we don't need to be too picky about our snapshot either.
238 : * As long as it sees all previously-committed XIDs as committed and all
239 : * aborted XIDs as aborted, we should be fine: nothing else is possible
240 : * here.
241 : *
242 : * We can't rely on the relcache for anything here, because that only knows
243 : * about the database to which we are connected, and can't handle access to
244 : * other databases. That also means we can't rely on the heap scan
245 : * infrastructure, which would be a bad idea anyway since it might try
246 : * to do things like HOT pruning which we definitely can't do safely in
247 : * a database to which we're not even connected.
248 : */
249 : static List *
250 432 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251 : {
252 : RelFileLocator rlocator;
253 : BlockNumber nblocks;
254 : BlockNumber blkno;
255 : Buffer buf;
256 : RelFileNumber relfilenumber;
257 : Page page;
258 432 : List *rlocatorlist = NIL;
259 : LockRelId relid;
260 : Snapshot snapshot;
261 : SMgrRelation smgr;
262 : BufferAccessStrategy bstrategy;
263 :
264 : /* Get pg_class relfilenumber. */
265 432 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266 : RelationRelationId);
267 :
268 : /* Don't read data into shared_buffers without holding a relation lock. */
269 432 : relid.dbId = dbid;
270 432 : relid.relId = RelationRelationId;
271 432 : LockRelationId(&relid, AccessShareLock);
272 :
273 : /* Prepare a RelFileLocator for the pg_class relation. */
274 432 : rlocator.spcOid = tbid;
275 432 : rlocator.dbOid = dbid;
276 432 : rlocator.relNumber = relfilenumber;
277 :
278 432 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279 432 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280 432 : smgrclose(smgr);
281 :
282 : /* Use a buffer access strategy since this is a bulk read operation. */
283 432 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
284 :
285 : /*
286 : * As explained in the function header comments, we need a snapshot that
287 : * will see all committed transactions as committed, and our transaction
288 : * snapshot - or the active snapshot - might not be new enough for that,
289 : * but the return value of GetLatestSnapshot() should work fine.
290 : */
291 432 : snapshot = GetLatestSnapshot();
292 :
293 : /* Process the relation block by block. */
294 6480 : for (blkno = 0; blkno < nblocks; blkno++)
295 : {
296 6048 : CHECK_FOR_INTERRUPTS();
297 :
298 6048 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299 : RBM_NORMAL, bstrategy, true);
300 :
301 6048 : LockBuffer(buf, BUFFER_LOCK_SHARE);
302 6048 : page = BufferGetPage(buf);
303 6048 : if (PageIsNew(page) || PageIsEmpty(page))
304 : {
305 0 : UnlockReleaseBuffer(buf);
306 0 : continue;
307 : }
308 :
309 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
310 6048 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311 : srcpath, rlocatorlist,
312 : snapshot);
313 :
314 6048 : UnlockReleaseBuffer(buf);
315 : }
316 :
317 : /* Release relation lock. */
318 432 : UnlockRelationId(&relid, AccessShareLock);
319 :
320 432 : return rlocatorlist;
321 : }
322 :
323 : /*
324 : * Scan one page of the source database's pg_class relation and add relevant
325 : * entries to rlocatorlist. The return value is the updated list.
326 : */
327 : static List *
328 6048 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
329 : char *srcpath, List *rlocatorlist,
330 : Snapshot snapshot)
331 : {
332 6048 : BlockNumber blkno = BufferGetBlockNumber(buf);
333 : OffsetNumber offnum;
334 : OffsetNumber maxoff;
335 : HeapTupleData tuple;
336 :
337 6048 : maxoff = PageGetMaxOffsetNumber(page);
338 :
339 : /* Loop over offsets. */
340 309068 : for (offnum = FirstOffsetNumber;
341 : offnum <= maxoff;
342 303020 : offnum = OffsetNumberNext(offnum))
343 : {
344 : ItemId itemid;
345 :
346 303020 : itemid = PageGetItemId(page, offnum);
347 :
348 : /* Nothing to do if slot is empty or already dead. */
349 303020 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
350 216494 : ItemIdIsRedirected(itemid))
351 123678 : continue;
352 :
353 : Assert(ItemIdIsNormal(itemid));
354 179342 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
355 :
356 : /* Initialize a HeapTupleData structure. */
357 179342 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
358 179342 : tuple.t_len = ItemIdGetLength(itemid);
359 179342 : tuple.t_tableOid = RelationRelationId;
360 :
361 : /* Skip tuples that are not visible to this snapshot. */
362 179342 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
363 : {
364 : CreateDBRelInfo *relinfo;
365 :
366 : /*
367 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
368 : * CreateDBRelInfo object for this tuple, but can also decide that
369 : * this tuple isn't something we need to copy. If we do need to
370 : * copy the relation, add it to the list.
371 : */
372 179312 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
373 : srcpath);
374 179312 : if (relinfo != NULL)
375 96800 : rlocatorlist = lappend(rlocatorlist, relinfo);
376 : }
377 : }
378 :
379 6048 : return rlocatorlist;
380 : }
381 :
382 : /*
383 : * Decide whether a certain pg_class tuple represents something that
384 : * needs to be copied from the source database to the destination database,
385 : * and if so, construct a CreateDBRelInfo for it.
386 : *
387 : * Visibility checks are handled by the caller, so our job here is just
388 : * to assess the data stored in the tuple.
389 : */
390 : CreateDBRelInfo *
391 179312 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
392 : char *srcpath)
393 : {
394 : CreateDBRelInfo *relinfo;
395 : Form_pg_class classForm;
396 179312 : RelFileNumber relfilenumber = InvalidRelFileNumber;
397 :
398 179312 : classForm = (Form_pg_class) GETSTRUCT(tuple);
399 :
400 : /*
401 : * Return NULL if this object does not need to be copied.
402 : *
403 : * Shared objects don't need to be copied, because they are shared.
404 : * Objects without storage can't be copied, because there's nothing to
405 : * copy. Temporary relations don't need to be copied either, because they
406 : * are inaccessible outside of the session that created them, which must
407 : * be gone already, and couldn't connect to a different database if it
408 : * still existed. autovacuum will eventually remove the pg_class entries
409 : * as well.
410 : */
411 179312 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
412 158576 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
413 96800 : classForm->relpersistence == RELPERSISTENCE_TEMP)
414 82512 : return NULL;
415 :
416 : /*
417 : * If relfilenumber is valid then directly use it. Otherwise, consult the
418 : * relmap.
419 : */
420 96800 : if (RelFileNumberIsValid(classForm->relfilenode))
421 89456 : relfilenumber = classForm->relfilenode;
422 : else
423 7344 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
424 : classForm->oid);
425 :
426 : /* We must have a valid relfilenumber. */
427 96800 : if (!RelFileNumberIsValid(relfilenumber))
428 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
429 : classForm->oid);
430 :
431 : /* Prepare a rel info element and add it to the list. */
432 96800 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
433 96800 : if (OidIsValid(classForm->reltablespace))
434 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
435 : else
436 96800 : relinfo->rlocator.spcOid = tbid;
437 :
438 96800 : relinfo->rlocator.dbOid = dbid;
439 96800 : relinfo->rlocator.relNumber = relfilenumber;
440 96800 : relinfo->reloid = classForm->oid;
441 :
442 : /* Temporary relations were rejected above. */
443 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
444 96800 : relinfo->permanent =
445 96800 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
446 :
447 96800 : return relinfo;
448 : }
449 :
450 : /*
451 : * Create database directory and write out the PG_VERSION file in the database
452 : * path. If isRedo is true, it's okay for the database directory to exist
453 : * already.
454 : */
455 : static void
456 476 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
457 : {
458 : int fd;
459 : int nbytes;
460 : char versionfile[MAXPGPATH];
461 : char buf[16];
462 :
463 : /*
464 : * Note that we don't have to copy version data from the source database;
465 : * there's only one legal value.
466 : */
467 476 : sprintf(buf, "%s\n", PG_MAJORVERSION);
468 476 : nbytes = strlen(PG_MAJORVERSION) + 1;
469 :
470 : /* Create database directory. */
471 476 : if (MakePGDirectory(dbpath) < 0)
472 : {
473 : /* Failure other than already exists or not in WAL replay? */
474 16 : if (errno != EEXIST || !isRedo)
475 0 : ereport(ERROR,
476 : (errcode_for_file_access(),
477 : errmsg("could not create directory \"%s\": %m", dbpath)));
478 : }
479 :
480 : /*
481 : * Create PG_VERSION file in the database path. If the file already
482 : * exists and we are in WAL replay then try again to open it in write
483 : * mode.
484 : */
485 476 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
486 :
487 476 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
488 476 : if (fd < 0 && errno == EEXIST && isRedo)
489 16 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
490 :
491 476 : if (fd < 0)
492 0 : ereport(ERROR,
493 : (errcode_for_file_access(),
494 : errmsg("could not create file \"%s\": %m", versionfile)));
495 :
496 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
497 476 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
498 476 : errno = 0;
499 476 : if ((int) write(fd, buf, nbytes) != nbytes)
500 : {
501 : /* If write didn't set errno, assume problem is no disk space. */
502 0 : if (errno == 0)
503 0 : errno = ENOSPC;
504 0 : ereport(ERROR,
505 : (errcode_for_file_access(),
506 : errmsg("could not write to file \"%s\": %m", versionfile)));
507 : }
508 476 : pgstat_report_wait_end();
509 :
510 476 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
511 476 : if (pg_fsync(fd) != 0)
512 0 : ereport(data_sync_elevel(ERROR),
513 : (errcode_for_file_access(),
514 : errmsg("could not fsync file \"%s\": %m", versionfile)));
515 476 : fsync_fname(dbpath, true);
516 476 : pgstat_report_wait_end();
517 :
518 : /* Close the version file. */
519 476 : CloseTransientFile(fd);
520 :
521 : /* If we are not in WAL replay then write the WAL. */
522 476 : if (!isRedo)
523 : {
524 : xl_dbase_create_wal_log_rec xlrec;
525 :
526 432 : START_CRIT_SECTION();
527 :
528 432 : xlrec.db_id = dbid;
529 432 : xlrec.tablespace_id = tsid;
530 :
531 432 : XLogBeginInsert();
532 432 : XLogRegisterData((char *) (&xlrec),
533 : sizeof(xl_dbase_create_wal_log_rec));
534 :
535 432 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
536 :
537 432 : END_CRIT_SECTION();
538 : }
539 476 : }
540 :
541 : /*
542 : * Create a new database using the FILE_COPY strategy.
543 : *
544 : * Copy each tablespace at the filesystem level, and log a single WAL record
545 : * for each tablespace copied. This requires a checkpoint before and after the
546 : * copy, which may be expensive, but it does greatly reduce WAL generation
547 : * if the copied database is large.
548 : */
549 : static void
550 212 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
551 : Oid dst_tsid)
552 : {
553 : TableScanDesc scan;
554 : Relation rel;
555 : HeapTuple tuple;
556 :
557 : /*
558 : * Force a checkpoint before starting the copy. This will force all dirty
559 : * buffers, including those of unlogged tables, out to disk, to ensure
560 : * source database is up-to-date on disk for the copy.
561 : * FlushDatabaseBuffers() would suffice for that, but we also want to
562 : * process any pending unlink requests. Otherwise, if a checkpoint
563 : * happened while we're copying files, a file might be deleted just when
564 : * we're about to copy it, causing the lstat() call in copydir() to fail
565 : * with ENOENT.
566 : *
567 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
568 : * is careful to ensure that template0 is fully written to disk prior to
569 : * any CREATE DATABASE commands.
570 : */
571 212 : if (!IsBinaryUpgrade)
572 192 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
573 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
574 :
575 : /*
576 : * Iterate through all tablespaces of the template database, and copy each
577 : * one to the new database.
578 : */
579 212 : rel = table_open(TableSpaceRelationId, AccessShareLock);
580 212 : scan = table_beginscan_catalog(rel, 0, NULL);
581 672 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
582 : {
583 460 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
584 460 : Oid srctablespace = spaceform->oid;
585 : Oid dsttablespace;
586 : char *srcpath;
587 : char *dstpath;
588 : struct stat st;
589 :
590 : /* No need to copy global tablespace */
591 460 : if (srctablespace == GLOBALTABLESPACE_OID)
592 248 : continue;
593 :
594 248 : srcpath = GetDatabasePath(src_dboid, srctablespace);
595 :
596 460 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
597 212 : directory_is_empty(srcpath))
598 : {
599 : /* Assume we can ignore it */
600 36 : pfree(srcpath);
601 36 : continue;
602 : }
603 :
604 212 : if (srctablespace == src_tsid)
605 212 : dsttablespace = dst_tsid;
606 : else
607 0 : dsttablespace = srctablespace;
608 :
609 212 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
610 :
611 : /*
612 : * Copy this subdirectory to the new location
613 : *
614 : * We don't need to copy subdirectories
615 : */
616 212 : copydir(srcpath, dstpath, false);
617 :
618 : /* Record the filesystem change in XLOG */
619 : {
620 : xl_dbase_create_file_copy_rec xlrec;
621 :
622 212 : xlrec.db_id = dst_dboid;
623 212 : xlrec.tablespace_id = dsttablespace;
624 212 : xlrec.src_db_id = src_dboid;
625 212 : xlrec.src_tablespace_id = srctablespace;
626 :
627 212 : XLogBeginInsert();
628 212 : XLogRegisterData((char *) &xlrec,
629 : sizeof(xl_dbase_create_file_copy_rec));
630 :
631 212 : (void) XLogInsert(RM_DBASE_ID,
632 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
633 : }
634 212 : pfree(srcpath);
635 212 : pfree(dstpath);
636 : }
637 212 : table_endscan(scan);
638 212 : table_close(rel, AccessShareLock);
639 :
640 : /*
641 : * We force a checkpoint before committing. This effectively means that
642 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
643 : * replayed (at least not in ordinary crash recovery; we still have to
644 : * make the XLOG entry for the benefit of PITR operations). This avoids
645 : * two nasty scenarios:
646 : *
647 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
648 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
649 : * of DBASE_CREATE replay would lose such files created in the new
650 : * database between our commit and the next checkpoint.
651 : *
652 : * #2: Since we have to recopy the source database during DBASE_CREATE
653 : * replay, we run the risk of copying changes in it that were committed
654 : * after the original CREATE DATABASE command but before the system crash
655 : * that led to the replay. This is at least unexpected and at worst could
656 : * lead to inconsistencies, eg duplicate table names.
657 : *
658 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
659 : *
660 : * In PITR replay, the first of these isn't an issue, and the second is
661 : * only a risk if the CREATE DATABASE and subsequent template database
662 : * change both occur while a base backup is being taken. There doesn't
663 : * seem to be much we can do about that except document it as a
664 : * limitation.
665 : *
666 : * In binary upgrade mode, we can skip this checkpoint because neither of
667 : * these problems applies: we don't ever replay the WAL generated during
668 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
669 : * (not to mention that we don't concurrently modify template0, either).
670 : *
671 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
672 : * strategy that avoids these problems.
673 : */
674 212 : if (!IsBinaryUpgrade)
675 192 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
676 : CHECKPOINT_WAIT);
677 212 : }
678 :
679 : /*
680 : * CREATE DATABASE
681 : */
682 : Oid
683 678 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
684 : {
685 : Oid src_dboid;
686 : Oid src_owner;
687 678 : int src_encoding = -1;
688 678 : char *src_collate = NULL;
689 678 : char *src_ctype = NULL;
690 678 : char *src_locale = NULL;
691 678 : char *src_icurules = NULL;
692 678 : char src_locprovider = '\0';
693 678 : char *src_collversion = NULL;
694 : bool src_istemplate;
695 678 : bool src_hasloginevt = false;
696 : bool src_allowconn;
697 678 : TransactionId src_frozenxid = InvalidTransactionId;
698 678 : MultiXactId src_minmxid = InvalidMultiXactId;
699 : Oid src_deftablespace;
700 : volatile Oid dst_deftablespace;
701 : Relation pg_database_rel;
702 : HeapTuple tuple;
703 678 : Datum new_record[Natts_pg_database] = {0};
704 678 : bool new_record_nulls[Natts_pg_database] = {0};
705 678 : Oid dboid = InvalidOid;
706 : Oid datdba;
707 : ListCell *option;
708 678 : DefElem *tablespacenameEl = NULL;
709 678 : DefElem *ownerEl = NULL;
710 678 : DefElem *templateEl = NULL;
711 678 : DefElem *encodingEl = NULL;
712 678 : DefElem *localeEl = NULL;
713 678 : DefElem *builtinlocaleEl = NULL;
714 678 : DefElem *collateEl = NULL;
715 678 : DefElem *ctypeEl = NULL;
716 678 : DefElem *iculocaleEl = NULL;
717 678 : DefElem *icurulesEl = NULL;
718 678 : DefElem *locproviderEl = NULL;
719 678 : DefElem *istemplateEl = NULL;
720 678 : DefElem *allowconnectionsEl = NULL;
721 678 : DefElem *connlimitEl = NULL;
722 678 : DefElem *collversionEl = NULL;
723 678 : DefElem *strategyEl = NULL;
724 678 : char *dbname = stmt->dbname;
725 678 : char *dbowner = NULL;
726 678 : const char *dbtemplate = NULL;
727 678 : char *dbcollate = NULL;
728 678 : char *dbctype = NULL;
729 678 : const char *dblocale = NULL;
730 678 : char *dbicurules = NULL;
731 678 : char dblocprovider = '\0';
732 : char *canonname;
733 678 : int encoding = -1;
734 678 : bool dbistemplate = false;
735 678 : bool dballowconnections = true;
736 678 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
737 678 : char *dbcollversion = NULL;
738 : int notherbackends;
739 : int npreparedxacts;
740 678 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
741 : createdb_failure_params fparms;
742 :
743 : /* Extract options from the statement node tree */
744 1896 : foreach(option, stmt->options)
745 : {
746 1218 : DefElem *defel = (DefElem *) lfirst(option);
747 :
748 1218 : if (strcmp(defel->defname, "tablespace") == 0)
749 : {
750 16 : if (tablespacenameEl)
751 0 : errorConflictingDefElem(defel, pstate);
752 16 : tablespacenameEl = defel;
753 : }
754 1202 : else if (strcmp(defel->defname, "owner") == 0)
755 : {
756 2 : if (ownerEl)
757 0 : errorConflictingDefElem(defel, pstate);
758 2 : ownerEl = defel;
759 : }
760 1200 : else if (strcmp(defel->defname, "template") == 0)
761 : {
762 296 : if (templateEl)
763 0 : errorConflictingDefElem(defel, pstate);
764 296 : templateEl = defel;
765 : }
766 904 : else if (strcmp(defel->defname, "encoding") == 0)
767 : {
768 66 : if (encodingEl)
769 0 : errorConflictingDefElem(defel, pstate);
770 66 : encodingEl = defel;
771 : }
772 838 : else if (strcmp(defel->defname, "locale") == 0)
773 : {
774 70 : if (localeEl)
775 0 : errorConflictingDefElem(defel, pstate);
776 70 : localeEl = defel;
777 : }
778 768 : else if (strcmp(defel->defname, "builtin_locale") == 0)
779 : {
780 16 : if (builtinlocaleEl)
781 0 : errorConflictingDefElem(defel, pstate);
782 16 : builtinlocaleEl = defel;
783 : }
784 752 : else if (strcmp(defel->defname, "lc_collate") == 0)
785 : {
786 18 : if (collateEl)
787 0 : errorConflictingDefElem(defel, pstate);
788 18 : collateEl = defel;
789 : }
790 734 : else if (strcmp(defel->defname, "lc_ctype") == 0)
791 : {
792 18 : if (ctypeEl)
793 0 : errorConflictingDefElem(defel, pstate);
794 18 : ctypeEl = defel;
795 : }
796 716 : else if (strcmp(defel->defname, "icu_locale") == 0)
797 : {
798 10 : if (iculocaleEl)
799 0 : errorConflictingDefElem(defel, pstate);
800 10 : iculocaleEl = defel;
801 : }
802 706 : else if (strcmp(defel->defname, "icu_rules") == 0)
803 : {
804 2 : if (icurulesEl)
805 0 : errorConflictingDefElem(defel, pstate);
806 2 : icurulesEl = defel;
807 : }
808 704 : else if (strcmp(defel->defname, "locale_provider") == 0)
809 : {
810 76 : if (locproviderEl)
811 0 : errorConflictingDefElem(defel, pstate);
812 76 : locproviderEl = defel;
813 : }
814 628 : else if (strcmp(defel->defname, "is_template") == 0)
815 : {
816 90 : if (istemplateEl)
817 0 : errorConflictingDefElem(defel, pstate);
818 90 : istemplateEl = defel;
819 : }
820 538 : else if (strcmp(defel->defname, "allow_connections") == 0)
821 : {
822 88 : if (allowconnectionsEl)
823 0 : errorConflictingDefElem(defel, pstate);
824 88 : allowconnectionsEl = defel;
825 : }
826 450 : else if (strcmp(defel->defname, "connection_limit") == 0)
827 : {
828 0 : if (connlimitEl)
829 0 : errorConflictingDefElem(defel, pstate);
830 0 : connlimitEl = defel;
831 : }
832 450 : else if (strcmp(defel->defname, "collation_version") == 0)
833 : {
834 20 : if (collversionEl)
835 0 : errorConflictingDefElem(defel, pstate);
836 20 : collversionEl = defel;
837 : }
838 430 : else if (strcmp(defel->defname, "location") == 0)
839 : {
840 0 : ereport(WARNING,
841 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842 : errmsg("LOCATION is not supported anymore"),
843 : errhint("Consider using tablespaces instead."),
844 : parser_errposition(pstate, defel->location)));
845 : }
846 430 : else if (strcmp(defel->defname, "oid") == 0)
847 : {
848 202 : dboid = defGetObjectId(defel);
849 :
850 : /*
851 : * We don't normally permit new databases to be created with
852 : * system-assigned OIDs. pg_upgrade tries to preserve database
853 : * OIDs, so we can't allow any database to be created with an OID
854 : * that might be in use in a freshly-initialized cluster created
855 : * by some future version. We assume all such OIDs will be from
856 : * the system-managed OID range.
857 : *
858 : * As an exception, however, we permit any OID to be assigned when
859 : * allow_system_table_mods=on (so that initdb can assign system
860 : * OIDs to template0 and postgres) or when performing a binary
861 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
862 : * in the source cluster).
863 : */
864 202 : if (dboid < FirstNormalObjectId &&
865 184 : !allowSystemTableMods && !IsBinaryUpgrade)
866 0 : ereport(ERROR,
867 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
868 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
869 : }
870 228 : else if (strcmp(defel->defname, "strategy") == 0)
871 : {
872 228 : if (strategyEl)
873 0 : errorConflictingDefElem(defel, pstate);
874 228 : strategyEl = defel;
875 : }
876 : else
877 0 : ereport(ERROR,
878 : (errcode(ERRCODE_SYNTAX_ERROR),
879 : errmsg("option \"%s\" not recognized", defel->defname),
880 : parser_errposition(pstate, defel->location)));
881 : }
882 :
883 678 : if (ownerEl && ownerEl->arg)
884 2 : dbowner = defGetString(ownerEl);
885 678 : if (templateEl && templateEl->arg)
886 296 : dbtemplate = defGetString(templateEl);
887 678 : if (encodingEl && encodingEl->arg)
888 : {
889 : const char *encoding_name;
890 :
891 66 : if (IsA(encodingEl->arg, Integer))
892 : {
893 0 : encoding = defGetInt32(encodingEl);
894 0 : encoding_name = pg_encoding_to_char(encoding);
895 0 : if (strcmp(encoding_name, "") == 0 ||
896 0 : pg_valid_server_encoding(encoding_name) < 0)
897 0 : ereport(ERROR,
898 : (errcode(ERRCODE_UNDEFINED_OBJECT),
899 : errmsg("%d is not a valid encoding code",
900 : encoding),
901 : parser_errposition(pstate, encodingEl->location)));
902 : }
903 : else
904 : {
905 66 : encoding_name = defGetString(encodingEl);
906 66 : encoding = pg_valid_server_encoding(encoding_name);
907 66 : if (encoding < 0)
908 0 : ereport(ERROR,
909 : (errcode(ERRCODE_UNDEFINED_OBJECT),
910 : errmsg("%s is not a valid encoding name",
911 : encoding_name),
912 : parser_errposition(pstate, encodingEl->location)));
913 : }
914 : }
915 678 : if (localeEl && localeEl->arg)
916 : {
917 70 : dbcollate = defGetString(localeEl);
918 70 : dbctype = defGetString(localeEl);
919 70 : dblocale = defGetString(localeEl);
920 : }
921 678 : if (builtinlocaleEl && builtinlocaleEl->arg)
922 16 : dblocale = defGetString(builtinlocaleEl);
923 678 : if (collateEl && collateEl->arg)
924 18 : dbcollate = defGetString(collateEl);
925 678 : if (ctypeEl && ctypeEl->arg)
926 18 : dbctype = defGetString(ctypeEl);
927 678 : if (iculocaleEl && iculocaleEl->arg)
928 10 : dblocale = defGetString(iculocaleEl);
929 678 : if (icurulesEl && icurulesEl->arg)
930 2 : dbicurules = defGetString(icurulesEl);
931 678 : if (locproviderEl && locproviderEl->arg)
932 : {
933 76 : char *locproviderstr = defGetString(locproviderEl);
934 :
935 76 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
936 30 : dblocprovider = COLLPROVIDER_BUILTIN;
937 46 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
938 16 : dblocprovider = COLLPROVIDER_ICU;
939 30 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
940 28 : dblocprovider = COLLPROVIDER_LIBC;
941 : else
942 2 : ereport(ERROR,
943 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
944 : errmsg("unrecognized locale provider: %s",
945 : locproviderstr)));
946 : }
947 676 : if (istemplateEl && istemplateEl->arg)
948 90 : dbistemplate = defGetBoolean(istemplateEl);
949 676 : if (allowconnectionsEl && allowconnectionsEl->arg)
950 88 : dballowconnections = defGetBoolean(allowconnectionsEl);
951 676 : if (connlimitEl && connlimitEl->arg)
952 : {
953 0 : dbconnlimit = defGetInt32(connlimitEl);
954 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
955 0 : ereport(ERROR,
956 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
957 : errmsg("invalid connection limit: %d", dbconnlimit)));
958 : }
959 676 : if (collversionEl)
960 20 : dbcollversion = defGetString(collversionEl);
961 :
962 : /* obtain OID of proposed owner */
963 676 : if (dbowner)
964 2 : datdba = get_role_oid(dbowner, false);
965 : else
966 674 : datdba = GetUserId();
967 :
968 : /*
969 : * To create a database, must have createdb privilege and must be able to
970 : * become the target role (this does not imply that the target role itself
971 : * must have createdb privilege). The latter provision guards against
972 : * "giveaway" attacks. Note that a superuser will always have both of
973 : * these privileges a fortiori.
974 : */
975 676 : if (!have_createdb_privilege())
976 6 : ereport(ERROR,
977 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
978 : errmsg("permission denied to create database")));
979 :
980 670 : check_can_set_role(GetUserId(), datdba);
981 :
982 : /*
983 : * Lookup database (template) to be cloned, and obtain share lock on it.
984 : * ShareLock allows two CREATE DATABASEs to work from the same template
985 : * concurrently, while ensuring no one is busy dropping it in parallel
986 : * (which would be Very Bad since we'd likely get an incomplete copy
987 : * without knowing it). This also prevents any new connections from being
988 : * made to the source until we finish copying it, so we can be sure it
989 : * won't change underneath us.
990 : */
991 670 : if (!dbtemplate)
992 376 : dbtemplate = "template1"; /* Default template database name */
993 :
994 670 : if (!get_db_info(dbtemplate, ShareLock,
995 : &src_dboid, &src_owner, &src_encoding,
996 : &src_istemplate, &src_allowconn, &src_hasloginevt,
997 : &src_frozenxid, &src_minmxid, &src_deftablespace,
998 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
999 : &src_collversion))
1000 0 : ereport(ERROR,
1001 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1002 : errmsg("template database \"%s\" does not exist",
1003 : dbtemplate)));
1004 :
1005 : /*
1006 : * If the source database was in the process of being dropped, we can't
1007 : * use it as a template.
1008 : */
1009 670 : if (database_is_invalid_oid(src_dboid))
1010 2 : ereport(ERROR,
1011 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1012 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1013 : errhint("Use DROP DATABASE to drop invalid databases."));
1014 :
1015 : /*
1016 : * Permission check: to copy a DB that's not marked datistemplate, you
1017 : * must be superuser or the owner thereof.
1018 : */
1019 668 : if (!src_istemplate)
1020 : {
1021 16 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1022 0 : ereport(ERROR,
1023 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1024 : errmsg("permission denied to copy database \"%s\"",
1025 : dbtemplate)));
1026 : }
1027 :
1028 : /* Validate the database creation strategy. */
1029 668 : if (strategyEl && strategyEl->arg)
1030 : {
1031 : char *strategy;
1032 :
1033 228 : strategy = defGetString(strategyEl);
1034 228 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1035 14 : dbstrategy = CREATEDB_WAL_LOG;
1036 214 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1037 212 : dbstrategy = CREATEDB_FILE_COPY;
1038 : else
1039 2 : ereport(ERROR,
1040 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1041 : errmsg("invalid create database strategy \"%s\"", strategy),
1042 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1043 : }
1044 :
1045 : /* If encoding or locales are defaulted, use source's setting */
1046 666 : if (encoding < 0)
1047 600 : encoding = src_encoding;
1048 666 : if (dbcollate == NULL)
1049 584 : dbcollate = src_collate;
1050 666 : if (dbctype == NULL)
1051 584 : dbctype = src_ctype;
1052 666 : if (dblocprovider == '\0')
1053 592 : dblocprovider = src_locprovider;
1054 666 : if (dblocale == NULL)
1055 588 : dblocale = src_locale;
1056 666 : if (dbicurules == NULL)
1057 664 : dbicurules = src_icurules;
1058 :
1059 : /* Some encodings are client only */
1060 666 : if (!PG_VALID_BE_ENCODING(encoding))
1061 0 : ereport(ERROR,
1062 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1063 : errmsg("invalid server encoding %d", encoding)));
1064 :
1065 : /* Check that the chosen locales are valid, and get canonical spellings */
1066 666 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1067 2 : ereport(ERROR,
1068 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1069 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1070 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1071 664 : dbcollate = canonname;
1072 664 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1073 2 : ereport(ERROR,
1074 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1075 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1076 : errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1077 662 : dbctype = canonname;
1078 :
1079 662 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1080 :
1081 : /* validate provider-specific parameters */
1082 662 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1083 : {
1084 604 : if (builtinlocaleEl)
1085 0 : ereport(ERROR,
1086 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1087 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1088 : }
1089 :
1090 662 : if (dblocprovider != COLLPROVIDER_ICU)
1091 : {
1092 632 : if (iculocaleEl)
1093 2 : ereport(ERROR,
1094 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1095 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1096 :
1097 630 : if (dbicurules)
1098 2 : ereport(ERROR,
1099 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1100 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1101 : }
1102 :
1103 : /* validate and canonicalize locale for the provider */
1104 658 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1105 : {
1106 : /*
1107 : * This would happen if template0 uses the libc provider but the new
1108 : * database uses builtin.
1109 : */
1110 54 : if (!dblocale)
1111 2 : ereport(ERROR,
1112 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1113 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1114 :
1115 52 : dblocale = builtin_validate_locale(encoding, dblocale);
1116 : }
1117 604 : else if (dblocprovider == COLLPROVIDER_ICU)
1118 : {
1119 30 : if (!(is_encoding_supported_by_icu(encoding)))
1120 2 : ereport(ERROR,
1121 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1122 : errmsg("encoding \"%s\" is not supported with ICU provider",
1123 : pg_encoding_to_char(encoding))));
1124 :
1125 : /*
1126 : * This would happen if template0 uses the libc provider but the new
1127 : * database uses icu.
1128 : */
1129 28 : if (!dblocale)
1130 2 : ereport(ERROR,
1131 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1132 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1133 :
1134 : /*
1135 : * During binary upgrade, or when the locale came from the template
1136 : * database, preserve locale string. Otherwise, canonicalize to a
1137 : * language tag.
1138 : */
1139 26 : if (!IsBinaryUpgrade && dblocale != src_locale)
1140 : {
1141 14 : char *langtag = icu_language_tag(dblocale,
1142 : icu_validation_level);
1143 :
1144 14 : if (langtag && strcmp(dblocale, langtag) != 0)
1145 : {
1146 6 : ereport(NOTICE,
1147 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1148 : langtag, dblocale)));
1149 :
1150 6 : dblocale = langtag;
1151 : }
1152 : }
1153 :
1154 26 : icu_validate_locale(dblocale);
1155 : }
1156 :
1157 : /* for libc, locale comes from datcollate and datctype */
1158 648 : if (dblocprovider == COLLPROVIDER_LIBC)
1159 574 : dblocale = NULL;
1160 :
1161 : /*
1162 : * Check that the new encoding and locale settings match the source
1163 : * database. We insist on this because we simply copy the source data ---
1164 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1165 : * according to the source locale would be wrong.
1166 : *
1167 : * However, we assume that template0 doesn't contain any non-ASCII data
1168 : * nor any indexes that depend on collation or ctype, so template0 can be
1169 : * used as template for creating a database with any encoding or locale.
1170 : */
1171 648 : if (strcmp(dbtemplate, "template0") != 0)
1172 : {
1173 394 : if (encoding != src_encoding)
1174 0 : ereport(ERROR,
1175 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1176 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1177 : pg_encoding_to_char(encoding),
1178 : pg_encoding_to_char(src_encoding)),
1179 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1180 :
1181 394 : if (strcmp(dbcollate, src_collate) != 0)
1182 2 : ereport(ERROR,
1183 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1184 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1185 : dbcollate, src_collate),
1186 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1187 :
1188 392 : if (strcmp(dbctype, src_ctype) != 0)
1189 0 : ereport(ERROR,
1190 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1192 : dbctype, src_ctype),
1193 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1194 :
1195 392 : if (dblocprovider != src_locprovider)
1196 0 : ereport(ERROR,
1197 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1198 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1199 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1200 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1201 :
1202 392 : if (dblocprovider == COLLPROVIDER_ICU)
1203 : {
1204 : char *val1;
1205 : char *val2;
1206 :
1207 : Assert(dblocale);
1208 : Assert(src_locale);
1209 12 : if (strcmp(dblocale, src_locale) != 0)
1210 0 : ereport(ERROR,
1211 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1212 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1213 : dblocale, src_locale),
1214 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1215 :
1216 12 : val1 = dbicurules;
1217 12 : if (!val1)
1218 12 : val1 = "";
1219 12 : val2 = src_icurules;
1220 12 : if (!val2)
1221 12 : val2 = "";
1222 12 : if (strcmp(val1, val2) != 0)
1223 0 : ereport(ERROR,
1224 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1225 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1226 : val1, val2),
1227 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1228 : }
1229 : }
1230 :
1231 : /*
1232 : * If we got a collation version for the template database, check that it
1233 : * matches the actual OS collation version. Otherwise error; the user
1234 : * needs to fix the template database first. Don't complain if a
1235 : * collation version was specified explicitly as a statement option; that
1236 : * is used by pg_upgrade to reproduce the old state exactly.
1237 : *
1238 : * (If the template database has no collation version, then either the
1239 : * platform/provider does not support collation versioning, or it's
1240 : * template0, for which we stipulate that it does not contain
1241 : * collation-using objects.)
1242 : */
1243 646 : if (src_collversion && !collversionEl)
1244 : {
1245 : char *actual_versionstr;
1246 : const char *locale;
1247 :
1248 244 : if (dblocprovider == COLLPROVIDER_LIBC)
1249 222 : locale = dbcollate;
1250 : else
1251 22 : locale = dblocale;
1252 :
1253 244 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1254 244 : if (!actual_versionstr)
1255 0 : ereport(ERROR,
1256 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1257 : dbtemplate)));
1258 :
1259 244 : if (strcmp(actual_versionstr, src_collversion) != 0)
1260 0 : ereport(ERROR,
1261 : (errmsg("template database \"%s\" has a collation version mismatch",
1262 : dbtemplate),
1263 : errdetail("The template database was created using collation version %s, "
1264 : "but the operating system provides version %s.",
1265 : src_collversion, actual_versionstr),
1266 : errhint("Rebuild all objects in the template database that use the default collation and run "
1267 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1268 : "or build PostgreSQL with the right library version.",
1269 : quote_identifier(dbtemplate))));
1270 : }
1271 :
1272 646 : if (dbcollversion == NULL)
1273 626 : dbcollversion = src_collversion;
1274 :
1275 : /*
1276 : * Normally, we copy the collation version from the template database.
1277 : * This last resort only applies if the template database does not have a
1278 : * collation version, which is normally only the case for template0.
1279 : */
1280 646 : if (dbcollversion == NULL)
1281 : {
1282 : const char *locale;
1283 :
1284 382 : if (dblocprovider == COLLPROVIDER_LIBC)
1285 344 : locale = dbcollate;
1286 : else
1287 38 : locale = dblocale;
1288 :
1289 382 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1290 : }
1291 :
1292 : /* Resolve default tablespace for new database */
1293 646 : if (tablespacenameEl && tablespacenameEl->arg)
1294 16 : {
1295 : char *tablespacename;
1296 : AclResult aclresult;
1297 :
1298 16 : tablespacename = defGetString(tablespacenameEl);
1299 16 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1300 : /* check permissions */
1301 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1302 : ACL_CREATE);
1303 16 : if (aclresult != ACLCHECK_OK)
1304 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1305 : tablespacename);
1306 :
1307 : /* pg_global must never be the default tablespace */
1308 16 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1309 0 : ereport(ERROR,
1310 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1311 : errmsg("pg_global cannot be used as default tablespace")));
1312 :
1313 : /*
1314 : * If we are trying to change the default tablespace of the template,
1315 : * we require that the template not have any files in the new default
1316 : * tablespace. This is necessary because otherwise the copied
1317 : * database would contain pg_class rows that refer to its default
1318 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1319 : * would cause problems. For example another CREATE DATABASE using
1320 : * the copied database as template, and trying to change its default
1321 : * tablespace again, would yield outright incorrect results (it would
1322 : * improperly move tables to the new default tablespace that should
1323 : * stay in the same tablespace).
1324 : */
1325 16 : if (dst_deftablespace != src_deftablespace)
1326 : {
1327 : char *srcpath;
1328 : struct stat st;
1329 :
1330 16 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1331 :
1332 16 : if (stat(srcpath, &st) == 0 &&
1333 0 : S_ISDIR(st.st_mode) &&
1334 0 : !directory_is_empty(srcpath))
1335 0 : ereport(ERROR,
1336 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337 : errmsg("cannot assign new default tablespace \"%s\"",
1338 : tablespacename),
1339 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1340 : dbtemplate)));
1341 16 : pfree(srcpath);
1342 : }
1343 : }
1344 : else
1345 : {
1346 : /* Use template database's default tablespace */
1347 630 : dst_deftablespace = src_deftablespace;
1348 : /* Note there is no additional permission check in this path */
1349 : }
1350 :
1351 : /*
1352 : * If built with appropriate switch, whine when regression-testing
1353 : * conventions for database names are violated. But don't complain during
1354 : * initdb.
1355 : */
1356 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1357 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1358 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1359 : #endif
1360 :
1361 : /*
1362 : * Check for db name conflict. This is just to give a more friendly error
1363 : * message than "unique index violation". There's a race condition but
1364 : * we're willing to accept the less friendly message in that case.
1365 : */
1366 646 : if (OidIsValid(get_database_oid(dbname, true)))
1367 2 : ereport(ERROR,
1368 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1369 : errmsg("database \"%s\" already exists", dbname)));
1370 :
1371 : /*
1372 : * The source DB can't have any active backends, except this one
1373 : * (exception is to allow CREATE DB while connected to template1).
1374 : * Otherwise we might copy inconsistent data.
1375 : *
1376 : * This should be last among the basic error checks, because it involves
1377 : * potential waiting; we may as well throw an error first if we're gonna
1378 : * throw one.
1379 : */
1380 644 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1381 0 : ereport(ERROR,
1382 : (errcode(ERRCODE_OBJECT_IN_USE),
1383 : errmsg("source database \"%s\" is being accessed by other users",
1384 : dbtemplate),
1385 : errdetail_busy_db(notherbackends, npreparedxacts)));
1386 :
1387 : /*
1388 : * Select an OID for the new database, checking that it doesn't have a
1389 : * filename conflict with anything already existing in the tablespace
1390 : * directories.
1391 : */
1392 644 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1393 :
1394 : /*
1395 : * If database OID is configured, check if the OID is already in use or
1396 : * data directory already exists.
1397 : */
1398 644 : if (OidIsValid(dboid))
1399 : {
1400 202 : char *existing_dbname = get_database_name(dboid);
1401 :
1402 202 : if (existing_dbname != NULL)
1403 0 : ereport(ERROR,
1404 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1405 : errmsg("database OID %u is already in use by database \"%s\"",
1406 : dboid, existing_dbname));
1407 :
1408 202 : if (check_db_file_conflict(dboid))
1409 0 : ereport(ERROR,
1410 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1411 : errmsg("data directory with the specified OID %u already exists", dboid));
1412 : }
1413 : else
1414 : {
1415 : /* Select an OID for the new database if is not explicitly configured. */
1416 : do
1417 : {
1418 442 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1419 : Anum_pg_database_oid);
1420 442 : } while (check_db_file_conflict(dboid));
1421 : }
1422 :
1423 : /*
1424 : * Insert a new tuple into pg_database. This establishes our ownership of
1425 : * the new database name (anyone else trying to insert the same name will
1426 : * block on the unique index, and fail after we commit).
1427 : */
1428 :
1429 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1430 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1431 :
1432 : /* Form tuple */
1433 644 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1434 644 : new_record[Anum_pg_database_datname - 1] =
1435 644 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1436 644 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1437 644 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1438 644 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1439 644 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1440 644 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1441 644 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1442 644 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1443 644 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1444 644 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1445 644 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1446 644 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1447 644 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1448 644 : if (dblocale)
1449 72 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1450 : else
1451 572 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1452 644 : if (dbicurules)
1453 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1454 : else
1455 644 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1456 644 : if (dbcollversion)
1457 524 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1458 : else
1459 120 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1460 :
1461 : /*
1462 : * We deliberately set datacl to default (NULL), rather than copying it
1463 : * from the template database. Copying it would be a bad idea when the
1464 : * owner is not the same as the template's owner.
1465 : */
1466 644 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1467 :
1468 644 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1469 : new_record, new_record_nulls);
1470 :
1471 644 : CatalogTupleInsert(pg_database_rel, tuple);
1472 :
1473 : /*
1474 : * Now generate additional catalog entries associated with the new DB
1475 : */
1476 :
1477 : /* Register owner dependency */
1478 644 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1479 :
1480 : /* Create pg_shdepend entries for objects within database */
1481 644 : copyTemplateDependencies(src_dboid, dboid);
1482 :
1483 : /* Post creation hook for new database */
1484 644 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1485 :
1486 : /*
1487 : * If we're going to be reading data for the to-be-created database into
1488 : * shared_buffers, take a lock on it. Nobody should know that this
1489 : * database exists yet, but it's good to maintain the invariant that an
1490 : * AccessExclusiveLock on the database is sufficient to drop all of its
1491 : * buffers without worrying about more being read later.
1492 : *
1493 : * Note that we need to do this before entering the
1494 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1495 : * expects this lock to be held already.
1496 : */
1497 644 : if (dbstrategy == CREATEDB_WAL_LOG)
1498 432 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1499 :
1500 : /*
1501 : * Once we start copying subdirectories, we need to be able to clean 'em
1502 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1503 : * is not a 100% solution, because of the possibility of failure during
1504 : * transaction commit after we leave this routine, but it should handle
1505 : * most scenarios.)
1506 : */
1507 644 : fparms.src_dboid = src_dboid;
1508 644 : fparms.dest_dboid = dboid;
1509 644 : fparms.strategy = dbstrategy;
1510 :
1511 644 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1512 : PointerGetDatum(&fparms));
1513 : {
1514 : /*
1515 : * If the user has asked to create a database with WAL_LOG strategy
1516 : * then call CreateDatabaseUsingWalLog, which will copy the database
1517 : * at the block level and it will WAL log each copied block.
1518 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1519 : * database file by file.
1520 : */
1521 644 : if (dbstrategy == CREATEDB_WAL_LOG)
1522 432 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1523 : dst_deftablespace);
1524 : else
1525 212 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1526 : dst_deftablespace);
1527 :
1528 : /*
1529 : * Close pg_database, but keep lock till commit.
1530 : */
1531 644 : table_close(pg_database_rel, NoLock);
1532 :
1533 : /*
1534 : * Force synchronous commit, thus minimizing the window between
1535 : * creation of the database files and committal of the transaction. If
1536 : * we crash before committing, we'll have a DB that's taking up disk
1537 : * space but is not in pg_database, which is not good.
1538 : */
1539 644 : ForceSyncCommit();
1540 : }
1541 644 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1542 : PointerGetDatum(&fparms));
1543 :
1544 644 : return dboid;
1545 : }
1546 :
1547 : /*
1548 : * Check whether chosen encoding matches chosen locale settings. This
1549 : * restriction is necessary because libc's locale-specific code usually
1550 : * fails when presented with data in an encoding it's not expecting. We
1551 : * allow mismatch in four cases:
1552 : *
1553 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1554 : * which works with any encoding.
1555 : *
1556 : * 2. locale encoding = -1, which means that we couldn't determine the
1557 : * locale's encoding and have to trust the user to get it right.
1558 : *
1559 : * 3. selected encoding is UTF8 and platform is win32. This is because
1560 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1561 : * converted to UTF16 before being used.
1562 : *
1563 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1564 : * is risky but we have historically allowed it --- notably, the
1565 : * regression tests require it.
1566 : *
1567 : * Note: if you change this policy, fix initdb to match.
1568 : */
1569 : void
1570 690 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1571 : {
1572 690 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1573 690 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1574 :
1575 696 : if (!(ctype_encoding == encoding ||
1576 6 : ctype_encoding == PG_SQL_ASCII ||
1577 : ctype_encoding == -1 ||
1578 : #ifdef WIN32
1579 : encoding == PG_UTF8 ||
1580 : #endif
1581 6 : (encoding == PG_SQL_ASCII && superuser())))
1582 0 : ereport(ERROR,
1583 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1584 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1585 : pg_encoding_to_char(encoding),
1586 : ctype),
1587 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1588 : pg_encoding_to_char(ctype_encoding))));
1589 :
1590 696 : if (!(collate_encoding == encoding ||
1591 6 : collate_encoding == PG_SQL_ASCII ||
1592 : collate_encoding == -1 ||
1593 : #ifdef WIN32
1594 : encoding == PG_UTF8 ||
1595 : #endif
1596 6 : (encoding == PG_SQL_ASCII && superuser())))
1597 0 : ereport(ERROR,
1598 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1599 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1600 : pg_encoding_to_char(encoding),
1601 : collate),
1602 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1603 : pg_encoding_to_char(collate_encoding))));
1604 690 : }
1605 :
1606 : /* Error cleanup callback for createdb */
1607 : static void
1608 0 : createdb_failure_callback(int code, Datum arg)
1609 : {
1610 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1611 :
1612 : /*
1613 : * If we were copying database at block levels then drop pages for the
1614 : * destination database that are in the shared buffer cache. And tell
1615 : * checkpointer to forget any pending fsync and unlink requests for files
1616 : * in the database. The reasoning behind doing this is same as explained
1617 : * in dropdb function. But unlike dropdb we don't need to call
1618 : * pgstat_drop_database because this database is still not created so
1619 : * there should not be any stat for this.
1620 : */
1621 0 : if (fparms->strategy == CREATEDB_WAL_LOG)
1622 : {
1623 0 : DropDatabaseBuffers(fparms->dest_dboid);
1624 0 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1625 :
1626 : /* Release lock on the target database. */
1627 0 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1628 : AccessShareLock);
1629 : }
1630 :
1631 : /*
1632 : * Release lock on source database before doing recursive remove. This is
1633 : * not essential but it seems desirable to release the lock as soon as
1634 : * possible.
1635 : */
1636 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1637 :
1638 : /* Throw away any successfully copied subdirectories */
1639 0 : remove_dbtablespaces(fparms->dest_dboid);
1640 0 : }
1641 :
1642 :
1643 : /*
1644 : * DROP DATABASE
1645 : */
1646 : void
1647 102 : dropdb(const char *dbname, bool missing_ok, bool force)
1648 : {
1649 : Oid db_id;
1650 : bool db_istemplate;
1651 : Relation pgdbrel;
1652 : HeapTuple tup;
1653 : ScanKeyData scankey;
1654 : void *inplace_state;
1655 : Form_pg_database datform;
1656 : int notherbackends;
1657 : int npreparedxacts;
1658 : int nslots,
1659 : nslots_active;
1660 : int nsubscriptions;
1661 :
1662 : /*
1663 : * Look up the target database's OID, and get exclusive lock on it. We
1664 : * need this to ensure that no new backend starts up in the target
1665 : * database while we are deleting it (see postinit.c), and that no one is
1666 : * using it as a CREATE DATABASE template or trying to delete it for
1667 : * themselves.
1668 : */
1669 102 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1670 :
1671 102 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1672 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1673 : {
1674 32 : if (!missing_ok)
1675 : {
1676 16 : ereport(ERROR,
1677 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1678 : errmsg("database \"%s\" does not exist", dbname)));
1679 : }
1680 : else
1681 : {
1682 : /* Close pg_database, release the lock, since we changed nothing */
1683 16 : table_close(pgdbrel, RowExclusiveLock);
1684 16 : ereport(NOTICE,
1685 : (errmsg("database \"%s\" does not exist, skipping",
1686 : dbname)));
1687 16 : return;
1688 : }
1689 : }
1690 :
1691 : /*
1692 : * Permission checks
1693 : */
1694 70 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1695 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1696 : dbname);
1697 :
1698 : /* DROP hook for the database being removed */
1699 70 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1700 :
1701 : /*
1702 : * Disallow dropping a DB that is marked istemplate. This is just to
1703 : * prevent people from accidentally dropping template0 or template1; they
1704 : * can do so if they're really determined ...
1705 : */
1706 70 : if (db_istemplate)
1707 0 : ereport(ERROR,
1708 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1709 : errmsg("cannot drop a template database")));
1710 :
1711 : /* Obviously can't drop my own database */
1712 70 : if (db_id == MyDatabaseId)
1713 0 : ereport(ERROR,
1714 : (errcode(ERRCODE_OBJECT_IN_USE),
1715 : errmsg("cannot drop the currently open database")));
1716 :
1717 : /*
1718 : * Check whether there are active logical slots that refer to the
1719 : * to-be-dropped database. The database lock we are holding prevents the
1720 : * creation of new slots using the database or existing slots becoming
1721 : * active.
1722 : */
1723 70 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1724 70 : if (nslots_active)
1725 : {
1726 2 : ereport(ERROR,
1727 : (errcode(ERRCODE_OBJECT_IN_USE),
1728 : errmsg("database \"%s\" is used by an active logical replication slot",
1729 : dbname),
1730 : errdetail_plural("There is %d active slot.",
1731 : "There are %d active slots.",
1732 : nslots_active, nslots_active)));
1733 : }
1734 :
1735 : /*
1736 : * Check if there are subscriptions defined in the target database.
1737 : *
1738 : * We can't drop them automatically because they might be holding
1739 : * resources in other databases/instances.
1740 : */
1741 68 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1742 0 : ereport(ERROR,
1743 : (errcode(ERRCODE_OBJECT_IN_USE),
1744 : errmsg("database \"%s\" is being used by logical replication subscription",
1745 : dbname),
1746 : errdetail_plural("There is %d subscription.",
1747 : "There are %d subscriptions.",
1748 : nsubscriptions, nsubscriptions)));
1749 :
1750 :
1751 : /*
1752 : * Attempt to terminate all existing connections to the target database if
1753 : * the user has requested to do so.
1754 : */
1755 68 : if (force)
1756 2 : TerminateOtherDBBackends(db_id);
1757 :
1758 : /*
1759 : * Check for other backends in the target database. (Because we hold the
1760 : * database lock, no new ones can start after this.)
1761 : *
1762 : * As in CREATE DATABASE, check this after other error conditions.
1763 : */
1764 68 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1765 0 : ereport(ERROR,
1766 : (errcode(ERRCODE_OBJECT_IN_USE),
1767 : errmsg("database \"%s\" is being accessed by other users",
1768 : dbname),
1769 : errdetail_busy_db(notherbackends, npreparedxacts)));
1770 :
1771 : /*
1772 : * Delete any comments or security labels associated with the database.
1773 : */
1774 68 : DeleteSharedComments(db_id, DatabaseRelationId);
1775 68 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1776 :
1777 : /*
1778 : * Remove settings associated with this database
1779 : */
1780 68 : DropSetting(db_id, InvalidOid);
1781 :
1782 : /*
1783 : * Remove shared dependency references for the database.
1784 : */
1785 68 : dropDatabaseDependencies(db_id);
1786 :
1787 : /*
1788 : * Tell the cumulative stats system to forget it immediately, too.
1789 : */
1790 68 : pgstat_drop_database(db_id);
1791 :
1792 : /*
1793 : * Except for the deletion of the catalog row, subsequent actions are not
1794 : * transactional (consider DropDatabaseBuffers() discarding modified
1795 : * buffers). But we might crash or get interrupted below. To prevent
1796 : * accesses to a database with invalid contents, mark the database as
1797 : * invalid using an in-place update.
1798 : *
1799 : * We need to flush the WAL before continuing, to guarantee the
1800 : * modification is durable before performing irreversible filesystem
1801 : * operations.
1802 : */
1803 68 : ScanKeyInit(&scankey,
1804 : Anum_pg_database_datname,
1805 : BTEqualStrategyNumber, F_NAMEEQ,
1806 : CStringGetDatum(dbname));
1807 68 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1808 : NULL, 1, &scankey, &tup, &inplace_state);
1809 68 : if (!HeapTupleIsValid(tup))
1810 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1811 68 : datform = (Form_pg_database) GETSTRUCT(tup);
1812 68 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1813 68 : systable_inplace_update_finish(inplace_state, tup);
1814 68 : XLogFlush(XactLastRecEnd);
1815 :
1816 : /*
1817 : * Also delete the tuple - transactionally. If this transaction commits,
1818 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1819 : */
1820 68 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1821 68 : heap_freetuple(tup);
1822 :
1823 : /*
1824 : * Drop db-specific replication slots.
1825 : */
1826 68 : ReplicationSlotsDropDBSlots(db_id);
1827 :
1828 : /*
1829 : * Drop pages for this database that are in the shared buffer cache. This
1830 : * is important to ensure that no remaining backend tries to write out a
1831 : * dirty buffer to the dead database later...
1832 : */
1833 68 : DropDatabaseBuffers(db_id);
1834 :
1835 : /*
1836 : * Tell checkpointer to forget any pending fsync and unlink requests for
1837 : * files in the database; else the fsyncs will fail at next checkpoint, or
1838 : * worse, it will delete files that belong to a newly created database
1839 : * with the same OID.
1840 : */
1841 68 : ForgetDatabaseSyncRequests(db_id);
1842 :
1843 : /*
1844 : * Force a checkpoint to make sure the checkpointer has received the
1845 : * message sent by ForgetDatabaseSyncRequests.
1846 : */
1847 68 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1848 :
1849 : /* Close all smgr fds in all backends. */
1850 68 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1851 :
1852 : /*
1853 : * Remove all tablespace subdirs belonging to the database.
1854 : */
1855 68 : remove_dbtablespaces(db_id);
1856 :
1857 : /*
1858 : * Close pg_database, but keep lock till commit.
1859 : */
1860 68 : table_close(pgdbrel, NoLock);
1861 :
1862 : /*
1863 : * Force synchronous commit, thus minimizing the window between removal of
1864 : * the database files and committal of the transaction. If we crash before
1865 : * committing, we'll have a DB that's gone on disk but still there
1866 : * according to pg_database, which is not good.
1867 : */
1868 68 : ForceSyncCommit();
1869 : }
1870 :
1871 :
1872 : /*
1873 : * Rename database
1874 : */
1875 : ObjectAddress
1876 6 : RenameDatabase(const char *oldname, const char *newname)
1877 : {
1878 : Oid db_id;
1879 : HeapTuple newtup;
1880 : ItemPointerData otid;
1881 : Relation rel;
1882 : int notherbackends;
1883 : int npreparedxacts;
1884 : ObjectAddress address;
1885 :
1886 : /*
1887 : * Look up the target database's OID, and get exclusive lock on it. We
1888 : * need this for the same reasons as DROP DATABASE.
1889 : */
1890 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1891 :
1892 6 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1893 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1894 0 : ereport(ERROR,
1895 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1896 : errmsg("database \"%s\" does not exist", oldname)));
1897 :
1898 : /* must be owner */
1899 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1900 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1901 : oldname);
1902 :
1903 : /* must have createdb rights */
1904 6 : if (!have_createdb_privilege())
1905 0 : ereport(ERROR,
1906 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1907 : errmsg("permission denied to rename database")));
1908 :
1909 : /*
1910 : * If built with appropriate switch, whine when regression-testing
1911 : * conventions for database names are violated.
1912 : */
1913 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1914 : if (strstr(newname, "regression") == NULL)
1915 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1916 : #endif
1917 :
1918 : /*
1919 : * Make sure the new name doesn't exist. See notes for same error in
1920 : * CREATE DATABASE.
1921 : */
1922 6 : if (OidIsValid(get_database_oid(newname, true)))
1923 0 : ereport(ERROR,
1924 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1925 : errmsg("database \"%s\" already exists", newname)));
1926 :
1927 : /*
1928 : * XXX Client applications probably store the current database somewhere,
1929 : * so renaming it could cause confusion. On the other hand, there may not
1930 : * be an actual problem besides a little confusion, so think about this
1931 : * and decide.
1932 : */
1933 6 : if (db_id == MyDatabaseId)
1934 0 : ereport(ERROR,
1935 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1936 : errmsg("current database cannot be renamed")));
1937 :
1938 : /*
1939 : * Make sure the database does not have active sessions. This is the same
1940 : * concern as above, but applied to other sessions.
1941 : *
1942 : * As in CREATE DATABASE, check this after other error conditions.
1943 : */
1944 6 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1945 0 : ereport(ERROR,
1946 : (errcode(ERRCODE_OBJECT_IN_USE),
1947 : errmsg("database \"%s\" is being accessed by other users",
1948 : oldname),
1949 : errdetail_busy_db(notherbackends, npreparedxacts)));
1950 :
1951 : /* rename */
1952 6 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1953 6 : if (!HeapTupleIsValid(newtup))
1954 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1955 6 : otid = newtup->t_self;
1956 6 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1957 6 : CatalogTupleUpdate(rel, &otid, newtup);
1958 6 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1959 :
1960 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1961 :
1962 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1963 :
1964 : /*
1965 : * Close pg_database, but keep lock till commit.
1966 : */
1967 6 : table_close(rel, NoLock);
1968 :
1969 6 : return address;
1970 : }
1971 :
1972 :
1973 : /*
1974 : * ALTER DATABASE SET TABLESPACE
1975 : */
1976 : static void
1977 16 : movedb(const char *dbname, const char *tblspcname)
1978 : {
1979 : Oid db_id;
1980 : Relation pgdbrel;
1981 : int notherbackends;
1982 : int npreparedxacts;
1983 : HeapTuple oldtuple,
1984 : newtuple;
1985 : Oid src_tblspcoid,
1986 : dst_tblspcoid;
1987 : ScanKeyData scankey;
1988 : SysScanDesc sysscan;
1989 : AclResult aclresult;
1990 : char *src_dbpath;
1991 : char *dst_dbpath;
1992 : DIR *dstdir;
1993 : struct dirent *xlde;
1994 : movedb_failure_params fparms;
1995 :
1996 : /*
1997 : * Look up the target database's OID, and get exclusive lock on it. We
1998 : * need this to ensure that no new backend starts up in the database while
1999 : * we are moving it, and that no one is using it as a CREATE DATABASE
2000 : * template or trying to delete it.
2001 : */
2002 16 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2003 :
2004 16 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2005 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2006 0 : ereport(ERROR,
2007 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2008 : errmsg("database \"%s\" does not exist", dbname)));
2009 :
2010 : /*
2011 : * We actually need a session lock, so that the lock will persist across
2012 : * the commit/restart below. (We could almost get away with letting the
2013 : * lock be released at commit, except that someone could try to move
2014 : * relations of the DB back into the old directory while we rmtree() it.)
2015 : */
2016 16 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2017 : AccessExclusiveLock);
2018 :
2019 : /*
2020 : * Permission checks
2021 : */
2022 16 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2023 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2024 : dbname);
2025 :
2026 : /*
2027 : * Obviously can't move the tables of my own database
2028 : */
2029 16 : if (db_id == MyDatabaseId)
2030 0 : ereport(ERROR,
2031 : (errcode(ERRCODE_OBJECT_IN_USE),
2032 : errmsg("cannot change the tablespace of the currently open database")));
2033 :
2034 : /*
2035 : * Get tablespace's oid
2036 : */
2037 16 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2038 :
2039 : /*
2040 : * Permission checks
2041 : */
2042 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2043 : ACL_CREATE);
2044 16 : if (aclresult != ACLCHECK_OK)
2045 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2046 : tblspcname);
2047 :
2048 : /*
2049 : * pg_global must never be the default tablespace
2050 : */
2051 16 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2052 0 : ereport(ERROR,
2053 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2054 : errmsg("pg_global cannot be used as default tablespace")));
2055 :
2056 : /*
2057 : * No-op if same tablespace
2058 : */
2059 16 : if (src_tblspcoid == dst_tblspcoid)
2060 : {
2061 0 : table_close(pgdbrel, NoLock);
2062 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2063 : AccessExclusiveLock);
2064 0 : return;
2065 : }
2066 :
2067 : /*
2068 : * Check for other backends in the target database. (Because we hold the
2069 : * database lock, no new ones can start after this.)
2070 : *
2071 : * As in CREATE DATABASE, check this after other error conditions.
2072 : */
2073 16 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2074 0 : ereport(ERROR,
2075 : (errcode(ERRCODE_OBJECT_IN_USE),
2076 : errmsg("database \"%s\" is being accessed by other users",
2077 : dbname),
2078 : errdetail_busy_db(notherbackends, npreparedxacts)));
2079 :
2080 : /*
2081 : * Get old and new database paths
2082 : */
2083 16 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2084 16 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2085 :
2086 : /*
2087 : * Force a checkpoint before proceeding. This will force all dirty
2088 : * buffers, including those of unlogged tables, out to disk, to ensure
2089 : * source database is up-to-date on disk for the copy.
2090 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2091 : * process any pending unlink requests. Otherwise, the check for existing
2092 : * files in the target directory might fail unnecessarily, not to mention
2093 : * that the copy might fail due to source files getting deleted under it.
2094 : * On Windows, this also ensures that background procs don't hold any open
2095 : * files, which would cause rmdir() to fail.
2096 : */
2097 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2098 : | CHECKPOINT_FLUSH_ALL);
2099 :
2100 : /* Close all smgr fds in all backends. */
2101 16 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2102 :
2103 : /*
2104 : * Now drop all buffers holding data of the target database; they should
2105 : * no longer be dirty so DropDatabaseBuffers is safe.
2106 : *
2107 : * It might seem that we could just let these buffers age out of shared
2108 : * buffers naturally, since they should not get referenced anymore. The
2109 : * problem with that is that if the user later moves the database back to
2110 : * its original tablespace, any still-surviving buffers would appear to
2111 : * contain valid data again --- but they'd be missing any changes made in
2112 : * the database while it was in the new tablespace. In any case, freeing
2113 : * buffers that should never be used again seems worth the cycles.
2114 : *
2115 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2116 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2117 : */
2118 16 : DropDatabaseBuffers(db_id);
2119 :
2120 : /*
2121 : * Check for existence of files in the target directory, i.e., objects of
2122 : * this database that are already in the target tablespace. We can't
2123 : * allow the move in such a case, because we would need to change those
2124 : * relations' pg_class.reltablespace entries to zero, and we don't have
2125 : * access to the DB's pg_class to do so.
2126 : */
2127 16 : dstdir = AllocateDir(dst_dbpath);
2128 16 : if (dstdir != NULL)
2129 : {
2130 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2131 : {
2132 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2133 0 : strcmp(xlde->d_name, "..") == 0)
2134 0 : continue;
2135 :
2136 0 : ereport(ERROR,
2137 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2138 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2139 : dbname, tblspcname),
2140 : errhint("You must move them back to the database's default tablespace before using this command.")));
2141 : }
2142 :
2143 0 : FreeDir(dstdir);
2144 :
2145 : /*
2146 : * The directory exists but is empty. We must remove it before using
2147 : * the copydir function.
2148 : */
2149 0 : if (rmdir(dst_dbpath) != 0)
2150 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2151 : dst_dbpath);
2152 : }
2153 :
2154 : /*
2155 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2156 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2157 : * of the possibility of failure during transaction commit, but it should
2158 : * handle most scenarios.
2159 : */
2160 16 : fparms.dest_dboid = db_id;
2161 16 : fparms.dest_tsoid = dst_tblspcoid;
2162 16 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2163 : PointerGetDatum(&fparms));
2164 : {
2165 16 : Datum new_record[Natts_pg_database] = {0};
2166 16 : bool new_record_nulls[Natts_pg_database] = {0};
2167 16 : bool new_record_repl[Natts_pg_database] = {0};
2168 :
2169 : /*
2170 : * Copy files from the old tablespace to the new one
2171 : */
2172 16 : copydir(src_dbpath, dst_dbpath, false);
2173 :
2174 : /*
2175 : * Record the filesystem change in XLOG
2176 : */
2177 : {
2178 : xl_dbase_create_file_copy_rec xlrec;
2179 :
2180 16 : xlrec.db_id = db_id;
2181 16 : xlrec.tablespace_id = dst_tblspcoid;
2182 16 : xlrec.src_db_id = db_id;
2183 16 : xlrec.src_tablespace_id = src_tblspcoid;
2184 :
2185 16 : XLogBeginInsert();
2186 16 : XLogRegisterData((char *) &xlrec,
2187 : sizeof(xl_dbase_create_file_copy_rec));
2188 :
2189 16 : (void) XLogInsert(RM_DBASE_ID,
2190 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2191 : }
2192 :
2193 : /*
2194 : * Update the database's pg_database tuple
2195 : */
2196 16 : ScanKeyInit(&scankey,
2197 : Anum_pg_database_datname,
2198 : BTEqualStrategyNumber, F_NAMEEQ,
2199 : CStringGetDatum(dbname));
2200 16 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2201 : NULL, 1, &scankey);
2202 16 : oldtuple = systable_getnext(sysscan);
2203 16 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2204 0 : ereport(ERROR,
2205 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2206 : errmsg("database \"%s\" does not exist", dbname)));
2207 16 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2208 :
2209 16 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2210 16 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2211 :
2212 16 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2213 : new_record,
2214 : new_record_nulls, new_record_repl);
2215 16 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2216 16 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2217 :
2218 16 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2219 :
2220 16 : systable_endscan(sysscan);
2221 :
2222 : /*
2223 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2224 : * ensure that we don't have to replay a committed
2225 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2226 : * any unlogged operations done in the new DB tablespace before the
2227 : * next checkpoint.
2228 : */
2229 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2230 :
2231 : /*
2232 : * Force synchronous commit, thus minimizing the window between
2233 : * copying the database files and committal of the transaction. If we
2234 : * crash before committing, we'll leave an orphaned set of files on
2235 : * disk, which is not fatal but not good either.
2236 : */
2237 16 : ForceSyncCommit();
2238 :
2239 : /*
2240 : * Close pg_database, but keep lock till commit.
2241 : */
2242 16 : table_close(pgdbrel, NoLock);
2243 : }
2244 16 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2245 : PointerGetDatum(&fparms));
2246 :
2247 : /*
2248 : * Commit the transaction so that the pg_database update is committed. If
2249 : * we crash while removing files, the database won't be corrupt, we'll
2250 : * just leave some orphaned files in the old directory.
2251 : *
2252 : * (This is OK because we know we aren't inside a transaction block.)
2253 : *
2254 : * XXX would it be safe/better to do this inside the ensure block? Not
2255 : * convinced it's a good idea; consider elog just after the transaction
2256 : * really commits.
2257 : */
2258 16 : PopActiveSnapshot();
2259 16 : CommitTransactionCommand();
2260 :
2261 : /* Start new transaction for the remaining work; don't need a snapshot */
2262 16 : StartTransactionCommand();
2263 :
2264 : /*
2265 : * Remove files from the old tablespace
2266 : */
2267 16 : if (!rmtree(src_dbpath, true))
2268 0 : ereport(WARNING,
2269 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2270 : src_dbpath)));
2271 :
2272 : /*
2273 : * Record the filesystem change in XLOG
2274 : */
2275 : {
2276 : xl_dbase_drop_rec xlrec;
2277 :
2278 16 : xlrec.db_id = db_id;
2279 16 : xlrec.ntablespaces = 1;
2280 :
2281 16 : XLogBeginInsert();
2282 16 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
2283 16 : XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
2284 :
2285 16 : (void) XLogInsert(RM_DBASE_ID,
2286 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2287 : }
2288 :
2289 : /* Now it's safe to release the database lock */
2290 16 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2291 : AccessExclusiveLock);
2292 :
2293 16 : pfree(src_dbpath);
2294 16 : pfree(dst_dbpath);
2295 : }
2296 :
2297 : /* Error cleanup callback for movedb */
2298 : static void
2299 0 : movedb_failure_callback(int code, Datum arg)
2300 : {
2301 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2302 : char *dstpath;
2303 :
2304 : /* Get rid of anything we managed to copy to the target directory */
2305 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2306 :
2307 0 : (void) rmtree(dstpath, true);
2308 :
2309 0 : pfree(dstpath);
2310 0 : }
2311 :
2312 : /*
2313 : * Process options and call dropdb function.
2314 : */
2315 : void
2316 102 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2317 : {
2318 102 : bool force = false;
2319 : ListCell *lc;
2320 :
2321 128 : foreach(lc, stmt->options)
2322 : {
2323 26 : DefElem *opt = (DefElem *) lfirst(lc);
2324 :
2325 26 : if (strcmp(opt->defname, "force") == 0)
2326 26 : force = true;
2327 : else
2328 0 : ereport(ERROR,
2329 : (errcode(ERRCODE_SYNTAX_ERROR),
2330 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2331 : parser_errposition(pstate, opt->location)));
2332 : }
2333 :
2334 102 : dropdb(stmt->dbname, stmt->missing_ok, force);
2335 84 : }
2336 :
2337 : /*
2338 : * ALTER DATABASE name ...
2339 : */
2340 : Oid
2341 46 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2342 : {
2343 : Relation rel;
2344 : Oid dboid;
2345 : HeapTuple tuple,
2346 : newtuple;
2347 : Form_pg_database datform;
2348 : ScanKeyData scankey;
2349 : SysScanDesc scan;
2350 : ListCell *option;
2351 46 : bool dbistemplate = false;
2352 46 : bool dballowconnections = true;
2353 46 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2354 46 : DefElem *distemplate = NULL;
2355 46 : DefElem *dallowconnections = NULL;
2356 46 : DefElem *dconnlimit = NULL;
2357 46 : DefElem *dtablespace = NULL;
2358 46 : Datum new_record[Natts_pg_database] = {0};
2359 46 : bool new_record_nulls[Natts_pg_database] = {0};
2360 46 : bool new_record_repl[Natts_pg_database] = {0};
2361 :
2362 : /* Extract options from the statement node tree */
2363 92 : foreach(option, stmt->options)
2364 : {
2365 46 : DefElem *defel = (DefElem *) lfirst(option);
2366 :
2367 46 : if (strcmp(defel->defname, "is_template") == 0)
2368 : {
2369 10 : if (distemplate)
2370 0 : errorConflictingDefElem(defel, pstate);
2371 10 : distemplate = defel;
2372 : }
2373 36 : else if (strcmp(defel->defname, "allow_connections") == 0)
2374 : {
2375 12 : if (dallowconnections)
2376 0 : errorConflictingDefElem(defel, pstate);
2377 12 : dallowconnections = defel;
2378 : }
2379 24 : else if (strcmp(defel->defname, "connection_limit") == 0)
2380 : {
2381 8 : if (dconnlimit)
2382 0 : errorConflictingDefElem(defel, pstate);
2383 8 : dconnlimit = defel;
2384 : }
2385 16 : else if (strcmp(defel->defname, "tablespace") == 0)
2386 : {
2387 16 : if (dtablespace)
2388 0 : errorConflictingDefElem(defel, pstate);
2389 16 : dtablespace = defel;
2390 : }
2391 : else
2392 0 : ereport(ERROR,
2393 : (errcode(ERRCODE_SYNTAX_ERROR),
2394 : errmsg("option \"%s\" not recognized", defel->defname),
2395 : parser_errposition(pstate, defel->location)));
2396 : }
2397 :
2398 46 : if (dtablespace)
2399 : {
2400 : /*
2401 : * While the SET TABLESPACE syntax doesn't allow any other options,
2402 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2403 : * options from being specified in that case.
2404 : */
2405 16 : if (list_length(stmt->options) != 1)
2406 0 : ereport(ERROR,
2407 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2408 : errmsg("option \"%s\" cannot be specified with other options",
2409 : dtablespace->defname),
2410 : parser_errposition(pstate, dtablespace->location)));
2411 : /* this case isn't allowed within a transaction block */
2412 16 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2413 16 : movedb(stmt->dbname, defGetString(dtablespace));
2414 16 : return InvalidOid;
2415 : }
2416 :
2417 30 : if (distemplate && distemplate->arg)
2418 10 : dbistemplate = defGetBoolean(distemplate);
2419 30 : if (dallowconnections && dallowconnections->arg)
2420 12 : dballowconnections = defGetBoolean(dallowconnections);
2421 30 : if (dconnlimit && dconnlimit->arg)
2422 : {
2423 8 : dbconnlimit = defGetInt32(dconnlimit);
2424 8 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2425 0 : ereport(ERROR,
2426 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2427 : errmsg("invalid connection limit: %d", dbconnlimit)));
2428 : }
2429 :
2430 : /*
2431 : * Get the old tuple. We don't need a lock on the database per se,
2432 : * because we're not going to do anything that would mess up incoming
2433 : * connections.
2434 : */
2435 30 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2436 30 : ScanKeyInit(&scankey,
2437 : Anum_pg_database_datname,
2438 : BTEqualStrategyNumber, F_NAMEEQ,
2439 30 : CStringGetDatum(stmt->dbname));
2440 30 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2441 : NULL, 1, &scankey);
2442 30 : tuple = systable_getnext(scan);
2443 30 : if (!HeapTupleIsValid(tuple))
2444 0 : ereport(ERROR,
2445 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2446 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2447 30 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2448 :
2449 30 : datform = (Form_pg_database) GETSTRUCT(tuple);
2450 30 : dboid = datform->oid;
2451 :
2452 30 : if (database_is_invalid_form(datform))
2453 : {
2454 2 : ereport(FATAL,
2455 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2456 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2457 : errhint("Use DROP DATABASE to drop invalid databases."));
2458 : }
2459 :
2460 28 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2461 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2462 0 : stmt->dbname);
2463 :
2464 : /*
2465 : * In order to avoid getting locked out and having to go through
2466 : * standalone mode, we refuse to disallow connections to the database
2467 : * we're currently connected to. Lockout can still happen with concurrent
2468 : * sessions but the likeliness of that is not high enough to worry about.
2469 : */
2470 28 : if (!dballowconnections && dboid == MyDatabaseId)
2471 0 : ereport(ERROR,
2472 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2473 : errmsg("cannot disallow connections for current database")));
2474 :
2475 : /*
2476 : * Build an updated tuple, perusing the information just obtained
2477 : */
2478 28 : if (distemplate)
2479 : {
2480 10 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2481 10 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2482 : }
2483 28 : if (dallowconnections)
2484 : {
2485 12 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2486 12 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2487 : }
2488 28 : if (dconnlimit)
2489 : {
2490 6 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2491 6 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2492 : }
2493 :
2494 28 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2495 : new_record_nulls, new_record_repl);
2496 28 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2497 28 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2498 :
2499 28 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2500 :
2501 28 : systable_endscan(scan);
2502 :
2503 : /* Close pg_database, but keep lock till commit */
2504 28 : table_close(rel, NoLock);
2505 :
2506 28 : return dboid;
2507 : }
2508 :
2509 :
2510 : /*
2511 : * ALTER DATABASE name REFRESH COLLATION VERSION
2512 : */
2513 : ObjectAddress
2514 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2515 : {
2516 : Relation rel;
2517 : ScanKeyData scankey;
2518 : SysScanDesc scan;
2519 : Oid db_id;
2520 : HeapTuple tuple;
2521 : Form_pg_database datForm;
2522 : ObjectAddress address;
2523 : Datum datum;
2524 : bool isnull;
2525 : char *oldversion;
2526 : char *newversion;
2527 :
2528 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2529 6 : ScanKeyInit(&scankey,
2530 : Anum_pg_database_datname,
2531 : BTEqualStrategyNumber, F_NAMEEQ,
2532 6 : CStringGetDatum(stmt->dbname));
2533 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2534 : NULL, 1, &scankey);
2535 6 : tuple = systable_getnext(scan);
2536 6 : if (!HeapTupleIsValid(tuple))
2537 0 : ereport(ERROR,
2538 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2539 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2540 :
2541 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2542 6 : db_id = datForm->oid;
2543 :
2544 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2545 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2546 0 : stmt->dbname);
2547 6 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2548 :
2549 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2550 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2551 :
2552 6 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2553 : {
2554 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2555 4 : if (isnull)
2556 0 : elog(ERROR, "unexpected null in pg_database");
2557 : }
2558 : else
2559 : {
2560 2 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2561 2 : if (isnull)
2562 0 : elog(ERROR, "unexpected null in pg_database");
2563 : }
2564 :
2565 6 : newversion = get_collation_actual_version(datForm->datlocprovider,
2566 6 : TextDatumGetCString(datum));
2567 :
2568 : /* cannot change from NULL to non-NULL or vice versa */
2569 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
2570 0 : elog(ERROR, "invalid collation version change");
2571 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2572 0 : {
2573 0 : bool nulls[Natts_pg_database] = {0};
2574 0 : bool replaces[Natts_pg_database] = {0};
2575 0 : Datum values[Natts_pg_database] = {0};
2576 : HeapTuple newtuple;
2577 :
2578 0 : ereport(NOTICE,
2579 : (errmsg("changing version from %s to %s",
2580 : oldversion, newversion)));
2581 :
2582 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2583 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2584 :
2585 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2586 : values, nulls, replaces);
2587 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2588 0 : heap_freetuple(newtuple);
2589 : }
2590 : else
2591 6 : ereport(NOTICE,
2592 : (errmsg("version has not changed")));
2593 6 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2594 :
2595 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2596 :
2597 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2598 :
2599 6 : systable_endscan(scan);
2600 :
2601 6 : table_close(rel, NoLock);
2602 :
2603 6 : return address;
2604 : }
2605 :
2606 :
2607 : /*
2608 : * ALTER DATABASE name SET ...
2609 : */
2610 : Oid
2611 1138 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2612 : {
2613 1138 : Oid datid = get_database_oid(stmt->dbname, false);
2614 :
2615 : /*
2616 : * Obtain a lock on the database and make sure it didn't go away in the
2617 : * meantime.
2618 : */
2619 1138 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2620 :
2621 1138 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2622 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2623 0 : stmt->dbname);
2624 :
2625 1138 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2626 :
2627 1138 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2628 :
2629 1138 : return datid;
2630 : }
2631 :
2632 :
2633 : /*
2634 : * ALTER DATABASE name OWNER TO newowner
2635 : */
2636 : ObjectAddress
2637 44 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2638 : {
2639 : Oid db_id;
2640 : HeapTuple tuple;
2641 : Relation rel;
2642 : ScanKeyData scankey;
2643 : SysScanDesc scan;
2644 : Form_pg_database datForm;
2645 : ObjectAddress address;
2646 :
2647 : /*
2648 : * Get the old tuple. We don't need a lock on the database per se,
2649 : * because we're not going to do anything that would mess up incoming
2650 : * connections.
2651 : */
2652 44 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2653 44 : ScanKeyInit(&scankey,
2654 : Anum_pg_database_datname,
2655 : BTEqualStrategyNumber, F_NAMEEQ,
2656 : CStringGetDatum(dbname));
2657 44 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2658 : NULL, 1, &scankey);
2659 44 : tuple = systable_getnext(scan);
2660 44 : if (!HeapTupleIsValid(tuple))
2661 0 : ereport(ERROR,
2662 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2663 : errmsg("database \"%s\" does not exist", dbname)));
2664 :
2665 44 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2666 44 : db_id = datForm->oid;
2667 :
2668 : /*
2669 : * If the new owner is the same as the existing owner, consider the
2670 : * command to have succeeded. This is to be consistent with other
2671 : * objects.
2672 : */
2673 44 : if (datForm->datdba != newOwnerId)
2674 : {
2675 : Datum repl_val[Natts_pg_database];
2676 24 : bool repl_null[Natts_pg_database] = {0};
2677 24 : bool repl_repl[Natts_pg_database] = {0};
2678 : Acl *newAcl;
2679 : Datum aclDatum;
2680 : bool isNull;
2681 : HeapTuple newtuple;
2682 :
2683 : /* Otherwise, must be owner of the existing object */
2684 24 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2685 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2686 : dbname);
2687 :
2688 : /* Must be able to become new owner */
2689 24 : check_can_set_role(GetUserId(), newOwnerId);
2690 :
2691 : /*
2692 : * must have createdb rights
2693 : *
2694 : * NOTE: This is different from other alter-owner checks in that the
2695 : * current user is checked for createdb privileges instead of the
2696 : * destination owner. This is consistent with the CREATE case for
2697 : * databases. Because superusers will always have this right, we need
2698 : * no special case for them.
2699 : */
2700 24 : if (!have_createdb_privilege())
2701 0 : ereport(ERROR,
2702 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2703 : errmsg("permission denied to change owner of database")));
2704 :
2705 24 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2706 :
2707 24 : repl_repl[Anum_pg_database_datdba - 1] = true;
2708 24 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2709 :
2710 : /*
2711 : * Determine the modified ACL for the new owner. This is only
2712 : * necessary when the ACL is non-null.
2713 : */
2714 24 : aclDatum = heap_getattr(tuple,
2715 : Anum_pg_database_datacl,
2716 : RelationGetDescr(rel),
2717 : &isNull);
2718 24 : if (!isNull)
2719 : {
2720 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2721 : datForm->datdba, newOwnerId);
2722 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2723 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2724 : }
2725 :
2726 24 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2727 24 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2728 24 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2729 :
2730 24 : heap_freetuple(newtuple);
2731 :
2732 : /* Update owner dependency reference */
2733 24 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2734 : }
2735 :
2736 44 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2737 :
2738 44 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2739 :
2740 44 : systable_endscan(scan);
2741 :
2742 : /* Close pg_database, but keep lock till commit */
2743 44 : table_close(rel, NoLock);
2744 :
2745 44 : return address;
2746 : }
2747 :
2748 :
2749 : Datum
2750 86 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2751 : {
2752 86 : Oid dbid = PG_GETARG_OID(0);
2753 : HeapTuple tp;
2754 : char datlocprovider;
2755 : Datum datum;
2756 : char *version;
2757 :
2758 86 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2759 86 : if (!HeapTupleIsValid(tp))
2760 0 : ereport(ERROR,
2761 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2762 : errmsg("database with OID %u does not exist", dbid)));
2763 :
2764 86 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2765 :
2766 86 : if (datlocprovider == COLLPROVIDER_LIBC)
2767 72 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2768 : else
2769 14 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2770 :
2771 86 : version = get_collation_actual_version(datlocprovider,
2772 86 : TextDatumGetCString(datum));
2773 :
2774 86 : ReleaseSysCache(tp);
2775 :
2776 86 : if (version)
2777 58 : PG_RETURN_TEXT_P(cstring_to_text(version));
2778 : else
2779 28 : PG_RETURN_NULL();
2780 : }
2781 :
2782 :
2783 : /*
2784 : * Helper functions
2785 : */
2786 :
2787 : /*
2788 : * Look up info about the database named "name". If the database exists,
2789 : * obtain the specified lock type on it, fill in any of the remaining
2790 : * parameters that aren't NULL, and return true. If no such database,
2791 : * return false.
2792 : */
2793 : static bool
2794 794 : get_db_info(const char *name, LOCKMODE lockmode,
2795 : Oid *dbIdP, Oid *ownerIdP,
2796 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2797 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2798 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2799 : char **dbIcurules,
2800 : char *dbLocProvider,
2801 : char **dbCollversion)
2802 : {
2803 794 : bool result = false;
2804 : Relation relation;
2805 :
2806 : Assert(name);
2807 :
2808 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2809 794 : relation = table_open(DatabaseRelationId, AccessShareLock);
2810 :
2811 : /*
2812 : * Loop covers the rare case where the database is renamed before we can
2813 : * lock it. We try again just in case we can find a new one of the same
2814 : * name.
2815 : */
2816 : for (;;)
2817 0 : {
2818 : ScanKeyData scanKey;
2819 : SysScanDesc scan;
2820 : HeapTuple tuple;
2821 : Oid dbOid;
2822 :
2823 : /*
2824 : * there's no syscache for database-indexed-by-name, so must do it the
2825 : * hard way
2826 : */
2827 794 : ScanKeyInit(&scanKey,
2828 : Anum_pg_database_datname,
2829 : BTEqualStrategyNumber, F_NAMEEQ,
2830 : CStringGetDatum(name));
2831 :
2832 794 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2833 : NULL, 1, &scanKey);
2834 :
2835 794 : tuple = systable_getnext(scan);
2836 :
2837 794 : if (!HeapTupleIsValid(tuple))
2838 : {
2839 : /* definitely no database of that name */
2840 32 : systable_endscan(scan);
2841 32 : break;
2842 : }
2843 :
2844 762 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2845 :
2846 762 : systable_endscan(scan);
2847 :
2848 : /*
2849 : * Now that we have a database OID, we can try to lock the DB.
2850 : */
2851 762 : if (lockmode != NoLock)
2852 762 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2853 :
2854 : /*
2855 : * And now, re-fetch the tuple by OID. If it's still there and still
2856 : * the same name, we win; else, drop the lock and loop back to try
2857 : * again.
2858 : */
2859 762 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2860 762 : if (HeapTupleIsValid(tuple))
2861 : {
2862 762 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2863 :
2864 762 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2865 : {
2866 : Datum datum;
2867 : bool isnull;
2868 :
2869 : /* oid of the database */
2870 762 : if (dbIdP)
2871 762 : *dbIdP = dbOid;
2872 : /* oid of the owner */
2873 762 : if (ownerIdP)
2874 670 : *ownerIdP = dbform->datdba;
2875 : /* character encoding */
2876 762 : if (encodingP)
2877 670 : *encodingP = dbform->encoding;
2878 : /* allowed as template? */
2879 762 : if (dbIsTemplateP)
2880 740 : *dbIsTemplateP = dbform->datistemplate;
2881 : /* Has on login event trigger? */
2882 762 : if (dbHasLoginEvtP)
2883 670 : *dbHasLoginEvtP = dbform->dathasloginevt;
2884 : /* allowing connections? */
2885 762 : if (dbAllowConnP)
2886 670 : *dbAllowConnP = dbform->datallowconn;
2887 : /* limit of frozen XIDs */
2888 762 : if (dbFrozenXidP)
2889 670 : *dbFrozenXidP = dbform->datfrozenxid;
2890 : /* minimum MultiXactId */
2891 762 : if (dbMinMultiP)
2892 670 : *dbMinMultiP = dbform->datminmxid;
2893 : /* default tablespace for this database */
2894 762 : if (dbTablespace)
2895 686 : *dbTablespace = dbform->dattablespace;
2896 : /* default locale settings for this database */
2897 762 : if (dbLocProvider)
2898 670 : *dbLocProvider = dbform->datlocprovider;
2899 762 : if (dbCollate)
2900 : {
2901 670 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2902 670 : *dbCollate = TextDatumGetCString(datum);
2903 : }
2904 762 : if (dbCtype)
2905 : {
2906 670 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2907 670 : *dbCtype = TextDatumGetCString(datum);
2908 : }
2909 762 : if (dbLocale)
2910 : {
2911 670 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2912 670 : if (isnull)
2913 612 : *dbLocale = NULL;
2914 : else
2915 58 : *dbLocale = TextDatumGetCString(datum);
2916 : }
2917 762 : if (dbIcurules)
2918 : {
2919 670 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2920 670 : if (isnull)
2921 670 : *dbIcurules = NULL;
2922 : else
2923 0 : *dbIcurules = TextDatumGetCString(datum);
2924 : }
2925 762 : if (dbCollversion)
2926 : {
2927 670 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2928 670 : if (isnull)
2929 416 : *dbCollversion = NULL;
2930 : else
2931 254 : *dbCollversion = TextDatumGetCString(datum);
2932 : }
2933 762 : ReleaseSysCache(tuple);
2934 762 : result = true;
2935 762 : break;
2936 : }
2937 : /* can only get here if it was just renamed */
2938 0 : ReleaseSysCache(tuple);
2939 : }
2940 :
2941 0 : if (lockmode != NoLock)
2942 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2943 : }
2944 :
2945 794 : table_close(relation, AccessShareLock);
2946 :
2947 794 : return result;
2948 : }
2949 :
2950 : /* Check if current user has createdb privileges */
2951 : bool
2952 736 : have_createdb_privilege(void)
2953 : {
2954 736 : bool result = false;
2955 : HeapTuple utup;
2956 :
2957 : /* Superusers can always do everything */
2958 736 : if (superuser())
2959 700 : return true;
2960 :
2961 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2962 36 : if (HeapTupleIsValid(utup))
2963 : {
2964 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2965 36 : ReleaseSysCache(utup);
2966 : }
2967 36 : return result;
2968 : }
2969 :
2970 : /*
2971 : * Remove tablespace directories
2972 : *
2973 : * We don't know what tablespaces db_id is using, so iterate through all
2974 : * tablespaces removing <tablespace>/db_id
2975 : */
2976 : static void
2977 68 : remove_dbtablespaces(Oid db_id)
2978 : {
2979 : Relation rel;
2980 : TableScanDesc scan;
2981 : HeapTuple tuple;
2982 68 : List *ltblspc = NIL;
2983 : ListCell *cell;
2984 : int ntblspc;
2985 : int i;
2986 : Oid *tablespace_ids;
2987 :
2988 68 : rel = table_open(TableSpaceRelationId, AccessShareLock);
2989 68 : scan = table_beginscan_catalog(rel, 0, NULL);
2990 254 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2991 : {
2992 186 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2993 186 : Oid dsttablespace = spcform->oid;
2994 : char *dstpath;
2995 : struct stat st;
2996 :
2997 : /* Don't mess with the global tablespace */
2998 186 : if (dsttablespace == GLOBALTABLESPACE_OID)
2999 118 : continue;
3000 :
3001 118 : dstpath = GetDatabasePath(db_id, dsttablespace);
3002 :
3003 118 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3004 : {
3005 : /* Assume we can ignore it */
3006 50 : pfree(dstpath);
3007 50 : continue;
3008 : }
3009 :
3010 68 : if (!rmtree(dstpath, true))
3011 0 : ereport(WARNING,
3012 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3013 : dstpath)));
3014 :
3015 68 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3016 68 : pfree(dstpath);
3017 : }
3018 :
3019 68 : ntblspc = list_length(ltblspc);
3020 68 : if (ntblspc == 0)
3021 : {
3022 0 : table_endscan(scan);
3023 0 : table_close(rel, AccessShareLock);
3024 0 : return;
3025 : }
3026 :
3027 68 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3028 68 : i = 0;
3029 136 : foreach(cell, ltblspc)
3030 68 : tablespace_ids[i++] = lfirst_oid(cell);
3031 :
3032 : /* Record the filesystem change in XLOG */
3033 : {
3034 : xl_dbase_drop_rec xlrec;
3035 :
3036 68 : xlrec.db_id = db_id;
3037 68 : xlrec.ntablespaces = ntblspc;
3038 :
3039 68 : XLogBeginInsert();
3040 68 : XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
3041 68 : XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
3042 :
3043 68 : (void) XLogInsert(RM_DBASE_ID,
3044 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3045 : }
3046 :
3047 68 : list_free(ltblspc);
3048 68 : pfree(tablespace_ids);
3049 :
3050 68 : table_endscan(scan);
3051 68 : table_close(rel, AccessShareLock);
3052 : }
3053 :
3054 : /*
3055 : * Check for existing files that conflict with a proposed new DB OID;
3056 : * return true if there are any
3057 : *
3058 : * If there were a subdirectory in any tablespace matching the proposed new
3059 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3060 : * try to remove that already-existing subdirectory during the cleanup in
3061 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3062 : * instead we make this extra check before settling on the OID of the new
3063 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3064 : * relfilenumber values.
3065 : */
3066 : static bool
3067 644 : check_db_file_conflict(Oid db_id)
3068 : {
3069 644 : bool result = false;
3070 : Relation rel;
3071 : TableScanDesc scan;
3072 : HeapTuple tuple;
3073 :
3074 644 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3075 644 : scan = table_beginscan_catalog(rel, 0, NULL);
3076 2030 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3077 : {
3078 1386 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3079 1386 : Oid dsttablespace = spcform->oid;
3080 : char *dstpath;
3081 : struct stat st;
3082 :
3083 : /* Don't mess with the global tablespace */
3084 1386 : if (dsttablespace == GLOBALTABLESPACE_OID)
3085 644 : continue;
3086 :
3087 742 : dstpath = GetDatabasePath(db_id, dsttablespace);
3088 :
3089 742 : if (lstat(dstpath, &st) == 0)
3090 : {
3091 : /* Found a conflicting file (or directory, whatever) */
3092 0 : pfree(dstpath);
3093 0 : result = true;
3094 0 : break;
3095 : }
3096 :
3097 742 : pfree(dstpath);
3098 : }
3099 :
3100 644 : table_endscan(scan);
3101 644 : table_close(rel, AccessShareLock);
3102 :
3103 644 : return result;
3104 : }
3105 :
3106 : /*
3107 : * Issue a suitable errdetail message for a busy database
3108 : */
3109 : static int
3110 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3111 : {
3112 0 : if (notherbackends > 0 && npreparedxacts > 0)
3113 :
3114 : /*
3115 : * We don't deal with singular versus plural here, since gettext
3116 : * doesn't support multiple plurals in one string.
3117 : */
3118 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3119 : notherbackends, npreparedxacts);
3120 0 : else if (notherbackends > 0)
3121 0 : errdetail_plural("There is %d other session using the database.",
3122 : "There are %d other sessions using the database.",
3123 : notherbackends,
3124 : notherbackends);
3125 : else
3126 0 : errdetail_plural("There is %d prepared transaction using the database.",
3127 : "There are %d prepared transactions using the database.",
3128 : npreparedxacts,
3129 : npreparedxacts);
3130 0 : return 0; /* just to keep ereport macro happy */
3131 : }
3132 :
3133 : /*
3134 : * get_database_oid - given a database name, look up the OID
3135 : *
3136 : * If missing_ok is false, throw an error if database name not found. If
3137 : * true, just return InvalidOid.
3138 : */
3139 : Oid
3140 2560 : get_database_oid(const char *dbname, bool missing_ok)
3141 : {
3142 : Relation pg_database;
3143 : ScanKeyData entry[1];
3144 : SysScanDesc scan;
3145 : HeapTuple dbtuple;
3146 : Oid oid;
3147 :
3148 : /*
3149 : * There's no syscache for pg_database indexed by name, so we must look
3150 : * the hard way.
3151 : */
3152 2560 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3153 2560 : ScanKeyInit(&entry[0],
3154 : Anum_pg_database_datname,
3155 : BTEqualStrategyNumber, F_NAMEEQ,
3156 : CStringGetDatum(dbname));
3157 2560 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3158 : NULL, 1, entry);
3159 :
3160 2560 : dbtuple = systable_getnext(scan);
3161 :
3162 : /* We assume that there can be at most one matching tuple */
3163 2560 : if (HeapTupleIsValid(dbtuple))
3164 1904 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3165 : else
3166 656 : oid = InvalidOid;
3167 :
3168 2560 : systable_endscan(scan);
3169 2560 : table_close(pg_database, AccessShareLock);
3170 :
3171 2560 : if (!OidIsValid(oid) && !missing_ok)
3172 6 : ereport(ERROR,
3173 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3174 : errmsg("database \"%s\" does not exist",
3175 : dbname)));
3176 :
3177 2554 : return oid;
3178 : }
3179 :
3180 :
3181 : /*
3182 : * get_database_name - given a database OID, look up the name
3183 : *
3184 : * Returns a palloc'd string, or NULL if no such database.
3185 : */
3186 : char *
3187 320142 : get_database_name(Oid dbid)
3188 : {
3189 : HeapTuple dbtuple;
3190 : char *result;
3191 :
3192 320142 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3193 320142 : if (HeapTupleIsValid(dbtuple))
3194 : {
3195 319922 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3196 319922 : ReleaseSysCache(dbtuple);
3197 : }
3198 : else
3199 220 : result = NULL;
3200 :
3201 320142 : return result;
3202 : }
3203 :
3204 :
3205 : /*
3206 : * While dropping a database the pg_database row is marked invalid, but the
3207 : * catalog contents still exist. Connections to such a database are not
3208 : * allowed.
3209 : */
3210 : bool
3211 35808 : database_is_invalid_form(Form_pg_database datform)
3212 : {
3213 35808 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3214 : }
3215 :
3216 :
3217 : /*
3218 : * Convenience wrapper around database_is_invalid_form()
3219 : */
3220 : bool
3221 670 : database_is_invalid_oid(Oid dboid)
3222 : {
3223 : HeapTuple dbtup;
3224 : Form_pg_database dbform;
3225 : bool invalid;
3226 :
3227 670 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3228 670 : if (!HeapTupleIsValid(dbtup))
3229 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3230 670 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3231 :
3232 670 : invalid = database_is_invalid_form(dbform);
3233 :
3234 670 : ReleaseSysCache(dbtup);
3235 :
3236 670 : return invalid;
3237 : }
3238 :
3239 :
3240 : /*
3241 : * recovery_create_dbdir()
3242 : *
3243 : * During recovery, there's a case where we validly need to recover a missing
3244 : * tablespace directory so that recovery can continue. This happens when
3245 : * recovery wants to create a database but the holding tablespace has been
3246 : * removed before the server stopped. Since we expect that the directory will
3247 : * be gone before reaching recovery consistency, and we have no knowledge about
3248 : * the tablespace other than its OID here, we create a real directory under
3249 : * pg_tblspc here instead of restoring the symlink.
3250 : *
3251 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3252 : */
3253 : static void
3254 44 : recovery_create_dbdir(char *path, bool only_tblspc)
3255 : {
3256 : struct stat st;
3257 :
3258 : Assert(RecoveryInProgress());
3259 :
3260 44 : if (stat(path, &st) == 0)
3261 44 : return;
3262 :
3263 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3264 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3265 :
3266 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3267 0 : ereport(PANIC,
3268 : errmsg("missing directory \"%s\"", path));
3269 :
3270 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3271 : "creating missing directory: %s", path);
3272 :
3273 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3274 0 : ereport(PANIC,
3275 : errmsg("could not create missing directory \"%s\": %m", path));
3276 : }
3277 :
3278 :
3279 : /*
3280 : * DATABASE resource manager's routines
3281 : */
3282 : void
3283 78 : dbase_redo(XLogReaderState *record)
3284 : {
3285 78 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3286 :
3287 : /* Backup blocks are not used in dbase records */
3288 : Assert(!XLogRecHasAnyBlockRefs(record));
3289 :
3290 78 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3291 : {
3292 8 : xl_dbase_create_file_copy_rec *xlrec =
3293 8 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3294 : char *src_path;
3295 : char *dst_path;
3296 : char *parent_path;
3297 : struct stat st;
3298 :
3299 8 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3300 8 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3301 :
3302 : /*
3303 : * Our theory for replaying a CREATE is to forcibly drop the target
3304 : * subdirectory if present, then re-copy the source data. This may be
3305 : * more work than needed, but it is simple to implement.
3306 : */
3307 8 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3308 : {
3309 0 : if (!rmtree(dst_path, true))
3310 : /* If this failed, copydir() below is going to error. */
3311 0 : ereport(WARNING,
3312 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3313 : dst_path)));
3314 : }
3315 :
3316 : /*
3317 : * If the parent of the target path doesn't exist, create it now. This
3318 : * enables us to create the target underneath later.
3319 : */
3320 8 : parent_path = pstrdup(dst_path);
3321 8 : get_parent_directory(parent_path);
3322 8 : if (stat(parent_path, &st) < 0)
3323 : {
3324 0 : if (errno != ENOENT)
3325 0 : ereport(FATAL,
3326 : errmsg("could not stat directory \"%s\": %m",
3327 : dst_path));
3328 :
3329 : /* create the parent directory if needed and valid */
3330 0 : recovery_create_dbdir(parent_path, true);
3331 : }
3332 8 : pfree(parent_path);
3333 :
3334 : /*
3335 : * There's a case where the copy source directory is missing for the
3336 : * same reason above. Create the empty source directory so that
3337 : * copydir below doesn't fail. The directory will be dropped soon by
3338 : * recovery.
3339 : */
3340 8 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3341 0 : recovery_create_dbdir(src_path, false);
3342 :
3343 : /*
3344 : * Force dirty buffers out to disk, to ensure source database is
3345 : * up-to-date for the copy.
3346 : */
3347 8 : FlushDatabaseBuffers(xlrec->src_db_id);
3348 :
3349 : /* Close all smgr fds in all backends. */
3350 8 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3351 :
3352 : /*
3353 : * Copy this subdirectory to the new location
3354 : *
3355 : * We don't need to copy subdirectories
3356 : */
3357 8 : copydir(src_path, dst_path, false);
3358 :
3359 8 : pfree(src_path);
3360 8 : pfree(dst_path);
3361 : }
3362 70 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3363 : {
3364 44 : xl_dbase_create_wal_log_rec *xlrec =
3365 44 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3366 : char *dbpath;
3367 : char *parent_path;
3368 :
3369 44 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3370 :
3371 : /* create the parent directory if needed and valid */
3372 44 : parent_path = pstrdup(dbpath);
3373 44 : get_parent_directory(parent_path);
3374 44 : recovery_create_dbdir(parent_path, true);
3375 :
3376 : /* Create the database directory with the version file. */
3377 44 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3378 : true);
3379 44 : pfree(dbpath);
3380 : }
3381 26 : else if (info == XLOG_DBASE_DROP)
3382 : {
3383 26 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3384 : char *dst_path;
3385 : int i;
3386 :
3387 26 : if (InHotStandby)
3388 : {
3389 : /*
3390 : * Lock database while we resolve conflicts to ensure that
3391 : * InitPostgres() cannot fully re-execute concurrently. This
3392 : * avoids backends re-connecting automatically to same database,
3393 : * which can happen in some cases.
3394 : *
3395 : * This will lock out walsenders trying to connect to db-specific
3396 : * slots for logical decoding too, so it's safe for us to drop
3397 : * slots.
3398 : */
3399 26 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3400 26 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3401 : }
3402 :
3403 : /* Drop any database-specific replication slots */
3404 26 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3405 :
3406 : /* Drop pages for this database that are in the shared buffer cache */
3407 26 : DropDatabaseBuffers(xlrec->db_id);
3408 :
3409 : /* Also, clean out any fsync requests that might be pending in md.c */
3410 26 : ForgetDatabaseSyncRequests(xlrec->db_id);
3411 :
3412 : /* Clean out the xlog relcache too */
3413 26 : XLogDropDatabase(xlrec->db_id);
3414 :
3415 : /* Close all smgr fds in all backends. */
3416 26 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3417 :
3418 52 : for (i = 0; i < xlrec->ntablespaces; i++)
3419 : {
3420 26 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3421 :
3422 : /* And remove the physical files */
3423 26 : if (!rmtree(dst_path, true))
3424 0 : ereport(WARNING,
3425 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3426 : dst_path)));
3427 26 : pfree(dst_path);
3428 : }
3429 :
3430 26 : if (InHotStandby)
3431 : {
3432 : /*
3433 : * Release locks prior to commit. XXX There is a race condition
3434 : * here that may allow backends to reconnect, but the window for
3435 : * this is small because the gap between here and commit is mostly
3436 : * fairly small and it is unlikely that people will be dropping
3437 : * databases that we are trying to connect to anyway.
3438 : */
3439 26 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3440 : }
3441 : }
3442 : else
3443 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3444 78 : }
|