Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/pg_locale.h"
68 : #include "utils/relmapper.h"
69 : #include "utils/snapmgr.h"
70 : #include "utils/syscache.h"
71 :
72 : /*
73 : * Create database strategy.
74 : *
75 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76 : * copied block.
77 : *
78 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79 : * database and log a single record for each tablespace copied. To make this
80 : * safe, it also triggers checkpoints before and after the operation.
81 : */
82 : typedef enum CreateDBStrategy
83 : {
84 : CREATEDB_WAL_LOG,
85 : CREATEDB_FILE_COPY,
86 : } CreateDBStrategy;
87 :
88 : typedef struct
89 : {
90 : Oid src_dboid; /* source (template) DB */
91 : Oid dest_dboid; /* DB we are trying to create */
92 : CreateDBStrategy strategy; /* create db strategy */
93 : } createdb_failure_params;
94 :
95 : typedef struct
96 : {
97 : Oid dest_dboid; /* DB we are trying to move */
98 : Oid dest_tsoid; /* tablespace we are trying to move to */
99 : } movedb_failure_params;
100 :
101 : /*
102 : * Information about a relation to be copied when creating a database.
103 : */
104 : typedef struct CreateDBRelInfo
105 : {
106 : RelFileLocator rlocator; /* physical relation identifier */
107 : Oid reloid; /* relation oid */
108 : bool permanent; /* relation is permanent or unlogged */
109 : } CreateDBRelInfo;
110 :
111 :
112 : /* non-export function prototypes */
113 : static void createdb_failure_callback(int code, Datum arg);
114 : static void movedb(const char *dbname, const char *tblspcname);
115 : static void movedb_failure_callback(int code, Datum arg);
116 : static bool get_db_info(const char *name, LOCKMODE lockmode,
117 : Oid *dbIdP, Oid *ownerIdP,
118 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121 : char **dbIcurules,
122 : char *dbLocProvider,
123 : char **dbCollversion);
124 : static void remove_dbtablespaces(Oid db_id);
125 : static bool check_db_file_conflict(Oid db_id);
126 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
127 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128 : Oid dst_tsid);
129 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
130 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
131 : Oid dbid, char *srcpath,
132 : List *rlocatorlist, Snapshot snapshot);
133 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
134 : Oid tbid, Oid dbid,
135 : char *srcpath);
136 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137 : bool isRedo);
138 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139 : Oid src_tsid, Oid dst_tsid);
140 : static void recovery_create_dbdir(char *path, bool only_tblspc);
141 :
142 : /*
143 : * Create a new database using the WAL_LOG strategy.
144 : *
145 : * Each copied block is separately written to the write-ahead log.
146 : */
147 : static void
148 490 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149 : Oid src_tsid, Oid dst_tsid)
150 : {
151 : char *srcpath;
152 : char *dstpath;
153 490 : List *rlocatorlist = NULL;
154 : ListCell *cell;
155 : LockRelId srcrelid;
156 : LockRelId dstrelid;
157 : RelFileLocator srcrlocator;
158 : RelFileLocator dstrlocator;
159 : CreateDBRelInfo *relinfo;
160 :
161 : /* Get source and destination database paths. */
162 490 : srcpath = GetDatabasePath(src_dboid, src_tsid);
163 490 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164 :
165 : /* Create database directory and write PG_VERSION file. */
166 490 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167 :
168 : /* Copy relmap file from source database to the destination database. */
169 490 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170 :
171 : /* Get list of relfilelocators to copy from the source database. */
172 490 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173 : Assert(rlocatorlist != NIL);
174 :
175 : /*
176 : * Database IDs will be the same for all relations so set them before
177 : * entering the loop.
178 : */
179 490 : srcrelid.dbId = src_dboid;
180 490 : dstrelid.dbId = dst_dboid;
181 :
182 : /* Loop over our list of relfilelocators and copy each one. */
183 109390 : foreach(cell, rlocatorlist)
184 : {
185 108904 : relinfo = lfirst(cell);
186 108904 : srcrlocator = relinfo->rlocator;
187 :
188 : /*
189 : * If the relation is from the source db's default tablespace then we
190 : * need to create it in the destination db's default tablespace.
191 : * Otherwise, we need to create in the same tablespace as it is in the
192 : * source database.
193 : */
194 108904 : if (srcrlocator.spcOid == src_tsid)
195 108904 : dstrlocator.spcOid = dst_tsid;
196 : else
197 0 : dstrlocator.spcOid = srcrlocator.spcOid;
198 :
199 108904 : dstrlocator.dbOid = dst_dboid;
200 108904 : dstrlocator.relNumber = srcrlocator.relNumber;
201 :
202 : /*
203 : * Acquire locks on source and target relations before copying.
204 : *
205 : * We typically do not read relation data into shared_buffers without
206 : * holding a relation lock. It's unclear what could go wrong if we
207 : * skipped it in this case, because nobody can be modifying either the
208 : * source or destination database at this point, and we have locks on
209 : * both databases, too, but let's take the conservative route.
210 : */
211 108904 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
212 108904 : LockRelationId(&srcrelid, AccessShareLock);
213 108904 : LockRelationId(&dstrelid, AccessShareLock);
214 :
215 : /* Copy relation storage from source to the destination. */
216 108904 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217 :
218 : /* Release the relation locks. */
219 108900 : UnlockRelationId(&srcrelid, AccessShareLock);
220 108900 : UnlockRelationId(&dstrelid, AccessShareLock);
221 : }
222 :
223 486 : pfree(srcpath);
224 486 : pfree(dstpath);
225 486 : list_free_deep(rlocatorlist);
226 486 : }
227 :
228 : /*
229 : * Scan the pg_class table in the source database to identify the relations
230 : * that need to be copied to the destination database.
231 : *
232 : * This is an exception to the usual rule that cross-database access is
233 : * not possible. We can make it work here because we know that there are no
234 : * connections to the source database and (since there can't be prepared
235 : * transactions touching that database) no in-doubt tuples either. This
236 : * means that we don't need to worry about pruning removing anything from
237 : * under us, and we don't need to be too picky about our snapshot either.
238 : * As long as it sees all previously-committed XIDs as committed and all
239 : * aborted XIDs as aborted, we should be fine: nothing else is possible
240 : * here.
241 : *
242 : * We can't rely on the relcache for anything here, because that only knows
243 : * about the database to which we are connected, and can't handle access to
244 : * other databases. That also means we can't rely on the heap scan
245 : * infrastructure, which would be a bad idea anyway since it might try
246 : * to do things like HOT pruning which we definitely can't do safely in
247 : * a database to which we're not even connected.
248 : */
249 : static List *
250 490 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251 : {
252 : RelFileLocator rlocator;
253 : BlockNumber nblocks;
254 : BlockNumber blkno;
255 : Buffer buf;
256 : RelFileNumber relfilenumber;
257 : Page page;
258 490 : List *rlocatorlist = NIL;
259 : LockRelId relid;
260 : Snapshot snapshot;
261 : SMgrRelation smgr;
262 : BufferAccessStrategy bstrategy;
263 :
264 : /* Get pg_class relfilenumber. */
265 490 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266 : RelationRelationId);
267 :
268 : /* Don't read data into shared_buffers without holding a relation lock. */
269 490 : relid.dbId = dbid;
270 490 : relid.relId = RelationRelationId;
271 490 : LockRelationId(&relid, AccessShareLock);
272 :
273 : /* Prepare a RelFileLocator for the pg_class relation. */
274 490 : rlocator.spcOid = tbid;
275 490 : rlocator.dbOid = dbid;
276 490 : rlocator.relNumber = relfilenumber;
277 :
278 490 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279 490 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280 490 : smgrclose(smgr);
281 :
282 : /* Use a buffer access strategy since this is a bulk read operation. */
283 490 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
284 :
285 : /*
286 : * As explained in the function header comments, we need a snapshot that
287 : * will see all committed transactions as committed, and our transaction
288 : * snapshot - or the active snapshot - might not be new enough for that,
289 : * but the return value of GetLatestSnapshot() should work fine.
290 : */
291 490 : snapshot = RegisterSnapshot(GetLatestSnapshot());
292 :
293 : /* Process the relation block by block. */
294 7350 : for (blkno = 0; blkno < nblocks; blkno++)
295 : {
296 6860 : CHECK_FOR_INTERRUPTS();
297 :
298 6860 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299 : RBM_NORMAL, bstrategy, true);
300 :
301 6860 : LockBuffer(buf, BUFFER_LOCK_SHARE);
302 6860 : page = BufferGetPage(buf);
303 6860 : if (PageIsNew(page) || PageIsEmpty(page))
304 : {
305 0 : UnlockReleaseBuffer(buf);
306 0 : continue;
307 : }
308 :
309 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
310 6860 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311 : srcpath, rlocatorlist,
312 : snapshot);
313 :
314 6860 : UnlockReleaseBuffer(buf);
315 : }
316 490 : UnregisterSnapshot(snapshot);
317 :
318 : /* Release relation lock. */
319 490 : UnlockRelationId(&relid, AccessShareLock);
320 :
321 490 : return rlocatorlist;
322 : }
323 :
324 : /*
325 : * Scan one page of the source database's pg_class relation and add relevant
326 : * entries to rlocatorlist. The return value is the updated list.
327 : */
328 : static List *
329 6860 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
330 : char *srcpath, List *rlocatorlist,
331 : Snapshot snapshot)
332 : {
333 6860 : BlockNumber blkno = BufferGetBlockNumber(buf);
334 : OffsetNumber offnum;
335 : OffsetNumber maxoff;
336 : HeapTupleData tuple;
337 :
338 6860 : maxoff = PageGetMaxOffsetNumber(page);
339 :
340 : /* Loop over offsets. */
341 6860 : for (offnum = FirstOffsetNumber;
342 323576 : offnum <= maxoff;
343 316716 : offnum = OffsetNumberNext(offnum))
344 : {
345 : ItemId itemid;
346 :
347 316716 : itemid = PageGetItemId(page, offnum);
348 :
349 : /* Nothing to do if slot is empty or already dead. */
350 316716 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
351 246050 : ItemIdIsRedirected(itemid))
352 113296 : continue;
353 :
354 : Assert(ItemIdIsNormal(itemid));
355 203420 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
356 :
357 : /* Initialize a HeapTupleData structure. */
358 203420 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
359 203420 : tuple.t_len = ItemIdGetLength(itemid);
360 203420 : tuple.t_tableOid = RelationRelationId;
361 :
362 : /* Skip tuples that are not visible to this snapshot. */
363 203420 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
364 : {
365 : CreateDBRelInfo *relinfo;
366 :
367 : /*
368 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
369 : * CreateDBRelInfo object for this tuple, but can also decide that
370 : * this tuple isn't something we need to copy. If we do need to
371 : * copy the relation, add it to the list.
372 : */
373 203390 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
374 : srcpath);
375 203390 : if (relinfo != NULL)
376 109800 : rlocatorlist = lappend(rlocatorlist, relinfo);
377 : }
378 : }
379 :
380 6860 : return rlocatorlist;
381 : }
382 :
383 : /*
384 : * Decide whether a certain pg_class tuple represents something that
385 : * needs to be copied from the source database to the destination database,
386 : * and if so, construct a CreateDBRelInfo for it.
387 : *
388 : * Visibility checks are handled by the caller, so our job here is just
389 : * to assess the data stored in the tuple.
390 : */
391 : CreateDBRelInfo *
392 203390 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
393 : char *srcpath)
394 : {
395 : CreateDBRelInfo *relinfo;
396 : Form_pg_class classForm;
397 203390 : RelFileNumber relfilenumber = InvalidRelFileNumber;
398 :
399 203390 : classForm = (Form_pg_class) GETSTRUCT(tuple);
400 :
401 : /*
402 : * Return NULL if this object does not need to be copied.
403 : *
404 : * Shared objects don't need to be copied, because they are shared.
405 : * Objects without storage can't be copied, because there's nothing to
406 : * copy. Temporary relations don't need to be copied either, because they
407 : * are inaccessible outside of the session that created them, which must
408 : * be gone already, and couldn't connect to a different database if it
409 : * still existed. autovacuum will eventually remove the pg_class entries
410 : * as well.
411 : */
412 203390 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
413 180850 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
414 109800 : classForm->relpersistence == RELPERSISTENCE_TEMP)
415 93590 : return NULL;
416 :
417 : /*
418 : * If relfilenumber is valid then directly use it. Otherwise, consult the
419 : * relmap.
420 : */
421 109800 : if (RelFileNumberIsValid(classForm->relfilenode))
422 101470 : relfilenumber = classForm->relfilenode;
423 : else
424 8330 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
425 : classForm->oid);
426 :
427 : /* We must have a valid relfilenumber. */
428 109800 : if (!RelFileNumberIsValid(relfilenumber))
429 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
430 : classForm->oid);
431 :
432 : /* Prepare a rel info element and add it to the list. */
433 109800 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
434 109800 : if (OidIsValid(classForm->reltablespace))
435 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
436 : else
437 109800 : relinfo->rlocator.spcOid = tbid;
438 :
439 109800 : relinfo->rlocator.dbOid = dbid;
440 109800 : relinfo->rlocator.relNumber = relfilenumber;
441 109800 : relinfo->reloid = classForm->oid;
442 :
443 : /* Temporary relations were rejected above. */
444 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
445 109800 : relinfo->permanent =
446 109800 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
447 :
448 109800 : return relinfo;
449 : }
450 :
451 : /*
452 : * Create database directory and write out the PG_VERSION file in the database
453 : * path. If isRedo is true, it's okay for the database directory to exist
454 : * already.
455 : */
456 : static void
457 536 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
458 : {
459 : int fd;
460 : int nbytes;
461 : char versionfile[MAXPGPATH];
462 : char buf[16];
463 :
464 : /*
465 : * Note that we don't have to copy version data from the source database;
466 : * there's only one legal value.
467 : */
468 536 : sprintf(buf, "%s\n", PG_MAJORVERSION);
469 536 : nbytes = strlen(PG_MAJORVERSION) + 1;
470 :
471 : /* Create database directory. */
472 536 : if (MakePGDirectory(dbpath) < 0)
473 : {
474 : /* Failure other than already exists or not in WAL replay? */
475 16 : if (errno != EEXIST || !isRedo)
476 0 : ereport(ERROR,
477 : (errcode_for_file_access(),
478 : errmsg("could not create directory \"%s\": %m", dbpath)));
479 : }
480 :
481 : /*
482 : * Create PG_VERSION file in the database path. If the file already
483 : * exists and we are in WAL replay then try again to open it in write
484 : * mode.
485 : */
486 536 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
487 :
488 536 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
489 536 : if (fd < 0 && errno == EEXIST && isRedo)
490 16 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
491 :
492 536 : if (fd < 0)
493 0 : ereport(ERROR,
494 : (errcode_for_file_access(),
495 : errmsg("could not create file \"%s\": %m", versionfile)));
496 :
497 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
498 536 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
499 536 : errno = 0;
500 536 : if ((int) write(fd, buf, nbytes) != nbytes)
501 : {
502 : /* If write didn't set errno, assume problem is no disk space. */
503 0 : if (errno == 0)
504 0 : errno = ENOSPC;
505 0 : ereport(ERROR,
506 : (errcode_for_file_access(),
507 : errmsg("could not write to file \"%s\": %m", versionfile)));
508 : }
509 536 : pgstat_report_wait_end();
510 :
511 536 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
512 536 : if (pg_fsync(fd) != 0)
513 0 : ereport(data_sync_elevel(ERROR),
514 : (errcode_for_file_access(),
515 : errmsg("could not fsync file \"%s\": %m", versionfile)));
516 536 : fsync_fname(dbpath, true);
517 536 : pgstat_report_wait_end();
518 :
519 : /* Close the version file. */
520 536 : CloseTransientFile(fd);
521 :
522 : /* If we are not in WAL replay then write the WAL. */
523 536 : if (!isRedo)
524 : {
525 : xl_dbase_create_wal_log_rec xlrec;
526 :
527 490 : START_CRIT_SECTION();
528 :
529 490 : xlrec.db_id = dbid;
530 490 : xlrec.tablespace_id = tsid;
531 :
532 490 : XLogBeginInsert();
533 490 : XLogRegisterData(&xlrec,
534 : sizeof(xl_dbase_create_wal_log_rec));
535 :
536 490 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
537 :
538 490 : END_CRIT_SECTION();
539 : }
540 536 : }
541 :
542 : /*
543 : * Create a new database using the FILE_COPY strategy.
544 : *
545 : * Copy each tablespace at the filesystem level, and log a single WAL record
546 : * for each tablespace copied. This requires a checkpoint before and after the
547 : * copy, which may be expensive, but it does greatly reduce WAL generation
548 : * if the copied database is large.
549 : */
550 : static void
551 260 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
552 : Oid dst_tsid)
553 : {
554 : TableScanDesc scan;
555 : Relation rel;
556 : HeapTuple tuple;
557 :
558 : /*
559 : * Force a checkpoint before starting the copy. This will force all dirty
560 : * buffers, including those of unlogged tables, out to disk, to ensure
561 : * source database is up-to-date on disk for the copy.
562 : * FlushDatabaseBuffers() would suffice for that, but we also want to
563 : * process any pending unlink requests. Otherwise, if a checkpoint
564 : * happened while we're copying files, a file might be deleted just when
565 : * we're about to copy it, causing the lstat() call in copydir() to fail
566 : * with ENOENT.
567 : *
568 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
569 : * is careful to ensure that template0 is fully written to disk prior to
570 : * any CREATE DATABASE commands.
571 : */
572 260 : if (!IsBinaryUpgrade)
573 212 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
574 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
575 :
576 : /*
577 : * Iterate through all tablespaces of the template database, and copy each
578 : * one to the new database.
579 : */
580 260 : rel = table_open(TableSpaceRelationId, AccessShareLock);
581 260 : scan = table_beginscan_catalog(rel, 0, NULL);
582 816 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
583 : {
584 556 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
585 556 : Oid srctablespace = spaceform->oid;
586 : Oid dsttablespace;
587 : char *srcpath;
588 : char *dstpath;
589 : struct stat st;
590 :
591 : /* No need to copy global tablespace */
592 556 : if (srctablespace == GLOBALTABLESPACE_OID)
593 296 : continue;
594 :
595 296 : srcpath = GetDatabasePath(src_dboid, srctablespace);
596 :
597 556 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
598 260 : directory_is_empty(srcpath))
599 : {
600 : /* Assume we can ignore it */
601 36 : pfree(srcpath);
602 36 : continue;
603 : }
604 :
605 260 : if (srctablespace == src_tsid)
606 260 : dsttablespace = dst_tsid;
607 : else
608 0 : dsttablespace = srctablespace;
609 :
610 260 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
611 :
612 : /*
613 : * Copy this subdirectory to the new location
614 : *
615 : * We don't need to copy subdirectories
616 : */
617 260 : copydir(srcpath, dstpath, false);
618 :
619 : /* Record the filesystem change in XLOG */
620 : {
621 : xl_dbase_create_file_copy_rec xlrec;
622 :
623 260 : xlrec.db_id = dst_dboid;
624 260 : xlrec.tablespace_id = dsttablespace;
625 260 : xlrec.src_db_id = src_dboid;
626 260 : xlrec.src_tablespace_id = srctablespace;
627 :
628 260 : XLogBeginInsert();
629 260 : XLogRegisterData(&xlrec,
630 : sizeof(xl_dbase_create_file_copy_rec));
631 :
632 260 : (void) XLogInsert(RM_DBASE_ID,
633 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
634 : }
635 260 : pfree(srcpath);
636 260 : pfree(dstpath);
637 : }
638 260 : table_endscan(scan);
639 260 : table_close(rel, AccessShareLock);
640 :
641 : /*
642 : * We force a checkpoint before committing. This effectively means that
643 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
644 : * replayed (at least not in ordinary crash recovery; we still have to
645 : * make the XLOG entry for the benefit of PITR operations). This avoids
646 : * two nasty scenarios:
647 : *
648 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
649 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
650 : * of DBASE_CREATE replay would lose such files created in the new
651 : * database between our commit and the next checkpoint.
652 : *
653 : * #2: Since we have to recopy the source database during DBASE_CREATE
654 : * replay, we run the risk of copying changes in it that were committed
655 : * after the original CREATE DATABASE command but before the system crash
656 : * that led to the replay. This is at least unexpected and at worst could
657 : * lead to inconsistencies, eg duplicate table names.
658 : *
659 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
660 : *
661 : * In PITR replay, the first of these isn't an issue, and the second is
662 : * only a risk if the CREATE DATABASE and subsequent template database
663 : * change both occur while a base backup is being taken. There doesn't
664 : * seem to be much we can do about that except document it as a
665 : * limitation.
666 : *
667 : * In binary upgrade mode, we can skip this checkpoint because neither of
668 : * these problems applies: we don't ever replay the WAL generated during
669 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
670 : * (not to mention that we don't concurrently modify template0, either).
671 : *
672 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
673 : * strategy that avoids these problems.
674 : */
675 260 : if (!IsBinaryUpgrade)
676 212 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
677 : CHECKPOINT_WAIT);
678 260 : }
679 :
680 : /*
681 : * CREATE DATABASE
682 : */
683 : Oid
684 784 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
685 : {
686 : Oid src_dboid;
687 : Oid src_owner;
688 784 : int src_encoding = -1;
689 784 : char *src_collate = NULL;
690 784 : char *src_ctype = NULL;
691 784 : char *src_locale = NULL;
692 784 : char *src_icurules = NULL;
693 784 : char src_locprovider = '\0';
694 784 : char *src_collversion = NULL;
695 : bool src_istemplate;
696 784 : bool src_hasloginevt = false;
697 : bool src_allowconn;
698 784 : TransactionId src_frozenxid = InvalidTransactionId;
699 784 : MultiXactId src_minmxid = InvalidMultiXactId;
700 : Oid src_deftablespace;
701 : volatile Oid dst_deftablespace;
702 : Relation pg_database_rel;
703 : HeapTuple tuple;
704 784 : Datum new_record[Natts_pg_database] = {0};
705 784 : bool new_record_nulls[Natts_pg_database] = {0};
706 784 : Oid dboid = InvalidOid;
707 : Oid datdba;
708 : ListCell *option;
709 784 : DefElem *tablespacenameEl = NULL;
710 784 : DefElem *ownerEl = NULL;
711 784 : DefElem *templateEl = NULL;
712 784 : DefElem *encodingEl = NULL;
713 784 : DefElem *localeEl = NULL;
714 784 : DefElem *builtinlocaleEl = NULL;
715 784 : DefElem *collateEl = NULL;
716 784 : DefElem *ctypeEl = NULL;
717 784 : DefElem *iculocaleEl = NULL;
718 784 : DefElem *icurulesEl = NULL;
719 784 : DefElem *locproviderEl = NULL;
720 784 : DefElem *istemplateEl = NULL;
721 784 : DefElem *allowconnectionsEl = NULL;
722 784 : DefElem *connlimitEl = NULL;
723 784 : DefElem *collversionEl = NULL;
724 784 : DefElem *strategyEl = NULL;
725 784 : char *dbname = stmt->dbname;
726 784 : char *dbowner = NULL;
727 784 : const char *dbtemplate = NULL;
728 784 : char *dbcollate = NULL;
729 784 : char *dbctype = NULL;
730 784 : const char *dblocale = NULL;
731 784 : char *dbicurules = NULL;
732 784 : char dblocprovider = '\0';
733 : char *canonname;
734 784 : int encoding = -1;
735 784 : bool dbistemplate = false;
736 784 : bool dballowconnections = true;
737 784 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
738 784 : char *dbcollversion = NULL;
739 : int notherbackends;
740 : int npreparedxacts;
741 784 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
742 : createdb_failure_params fparms;
743 :
744 : /* Extract options from the statement node tree */
745 2284 : foreach(option, stmt->options)
746 : {
747 1500 : DefElem *defel = (DefElem *) lfirst(option);
748 :
749 1500 : if (strcmp(defel->defname, "tablespace") == 0)
750 : {
751 16 : if (tablespacenameEl)
752 0 : errorConflictingDefElem(defel, pstate);
753 16 : tablespacenameEl = defel;
754 : }
755 1484 : else if (strcmp(defel->defname, "owner") == 0)
756 : {
757 2 : if (ownerEl)
758 0 : errorConflictingDefElem(defel, pstate);
759 2 : ownerEl = defel;
760 : }
761 1482 : else if (strcmp(defel->defname, "template") == 0)
762 : {
763 342 : if (templateEl)
764 0 : errorConflictingDefElem(defel, pstate);
765 342 : templateEl = defel;
766 : }
767 1140 : else if (strcmp(defel->defname, "encoding") == 0)
768 : {
769 96 : if (encodingEl)
770 0 : errorConflictingDefElem(defel, pstate);
771 96 : encodingEl = defel;
772 : }
773 1044 : else if (strcmp(defel->defname, "locale") == 0)
774 : {
775 98 : if (localeEl)
776 0 : errorConflictingDefElem(defel, pstate);
777 98 : localeEl = defel;
778 : }
779 946 : else if (strcmp(defel->defname, "builtin_locale") == 0)
780 : {
781 18 : if (builtinlocaleEl)
782 0 : errorConflictingDefElem(defel, pstate);
783 18 : builtinlocaleEl = defel;
784 : }
785 928 : else if (strcmp(defel->defname, "lc_collate") == 0)
786 : {
787 16 : if (collateEl)
788 0 : errorConflictingDefElem(defel, pstate);
789 16 : collateEl = defel;
790 : }
791 912 : else if (strcmp(defel->defname, "lc_ctype") == 0)
792 : {
793 16 : if (ctypeEl)
794 0 : errorConflictingDefElem(defel, pstate);
795 16 : ctypeEl = defel;
796 : }
797 896 : else if (strcmp(defel->defname, "icu_locale") == 0)
798 : {
799 10 : if (iculocaleEl)
800 0 : errorConflictingDefElem(defel, pstate);
801 10 : iculocaleEl = defel;
802 : }
803 886 : else if (strcmp(defel->defname, "icu_rules") == 0)
804 : {
805 2 : if (icurulesEl)
806 0 : errorConflictingDefElem(defel, pstate);
807 2 : icurulesEl = defel;
808 : }
809 884 : else if (strcmp(defel->defname, "locale_provider") == 0)
810 : {
811 104 : if (locproviderEl)
812 0 : errorConflictingDefElem(defel, pstate);
813 104 : locproviderEl = defel;
814 : }
815 780 : else if (strcmp(defel->defname, "is_template") == 0)
816 : {
817 100 : if (istemplateEl)
818 0 : errorConflictingDefElem(defel, pstate);
819 100 : istemplateEl = defel;
820 : }
821 680 : else if (strcmp(defel->defname, "allow_connections") == 0)
822 : {
823 98 : if (allowconnectionsEl)
824 0 : errorConflictingDefElem(defel, pstate);
825 98 : allowconnectionsEl = defel;
826 : }
827 582 : else if (strcmp(defel->defname, "connection_limit") == 0)
828 : {
829 0 : if (connlimitEl)
830 0 : errorConflictingDefElem(defel, pstate);
831 0 : connlimitEl = defel;
832 : }
833 582 : else if (strcmp(defel->defname, "collation_version") == 0)
834 : {
835 48 : if (collversionEl)
836 0 : errorConflictingDefElem(defel, pstate);
837 48 : collversionEl = defel;
838 : }
839 534 : else if (strcmp(defel->defname, "location") == 0)
840 : {
841 0 : ereport(WARNING,
842 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
843 : errmsg("LOCATION is not supported anymore"),
844 : errhint("Consider using tablespaces instead."),
845 : parser_errposition(pstate, defel->location)));
846 : }
847 534 : else if (strcmp(defel->defname, "oid") == 0)
848 : {
849 250 : dboid = defGetObjectId(defel);
850 :
851 : /*
852 : * We don't normally permit new databases to be created with
853 : * system-assigned OIDs. pg_upgrade tries to preserve database
854 : * OIDs, so we can't allow any database to be created with an OID
855 : * that might be in use in a freshly-initialized cluster created
856 : * by some future version. We assume all such OIDs will be from
857 : * the system-managed OID range.
858 : *
859 : * As an exception, however, we permit any OID to be assigned when
860 : * allow_system_table_mods=on (so that initdb can assign system
861 : * OIDs to template0 and postgres) or when performing a binary
862 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
863 : * in the source cluster).
864 : */
865 250 : if (dboid < FirstNormalObjectId &&
866 224 : !allowSystemTableMods && !IsBinaryUpgrade)
867 0 : ereport(ERROR,
868 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
869 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
870 : }
871 284 : else if (strcmp(defel->defname, "strategy") == 0)
872 : {
873 284 : if (strategyEl)
874 0 : errorConflictingDefElem(defel, pstate);
875 284 : strategyEl = defel;
876 : }
877 : else
878 0 : ereport(ERROR,
879 : (errcode(ERRCODE_SYNTAX_ERROR),
880 : errmsg("option \"%s\" not recognized", defel->defname),
881 : parser_errposition(pstate, defel->location)));
882 : }
883 :
884 784 : if (ownerEl && ownerEl->arg)
885 2 : dbowner = defGetString(ownerEl);
886 784 : if (templateEl && templateEl->arg)
887 342 : dbtemplate = defGetString(templateEl);
888 784 : if (encodingEl && encodingEl->arg)
889 : {
890 : const char *encoding_name;
891 :
892 96 : if (IsA(encodingEl->arg, Integer))
893 : {
894 0 : encoding = defGetInt32(encodingEl);
895 0 : encoding_name = pg_encoding_to_char(encoding);
896 0 : if (strcmp(encoding_name, "") == 0 ||
897 0 : pg_valid_server_encoding(encoding_name) < 0)
898 0 : ereport(ERROR,
899 : (errcode(ERRCODE_UNDEFINED_OBJECT),
900 : errmsg("%d is not a valid encoding code",
901 : encoding),
902 : parser_errposition(pstate, encodingEl->location)));
903 : }
904 : else
905 : {
906 96 : encoding_name = defGetString(encodingEl);
907 96 : encoding = pg_valid_server_encoding(encoding_name);
908 96 : if (encoding < 0)
909 0 : ereport(ERROR,
910 : (errcode(ERRCODE_UNDEFINED_OBJECT),
911 : errmsg("%s is not a valid encoding name",
912 : encoding_name),
913 : parser_errposition(pstate, encodingEl->location)));
914 : }
915 : }
916 784 : if (localeEl && localeEl->arg)
917 : {
918 98 : dbcollate = defGetString(localeEl);
919 98 : dbctype = defGetString(localeEl);
920 98 : dblocale = defGetString(localeEl);
921 : }
922 784 : if (builtinlocaleEl && builtinlocaleEl->arg)
923 18 : dblocale = defGetString(builtinlocaleEl);
924 784 : if (collateEl && collateEl->arg)
925 16 : dbcollate = defGetString(collateEl);
926 784 : if (ctypeEl && ctypeEl->arg)
927 16 : dbctype = defGetString(ctypeEl);
928 784 : if (iculocaleEl && iculocaleEl->arg)
929 10 : dblocale = defGetString(iculocaleEl);
930 784 : if (icurulesEl && icurulesEl->arg)
931 2 : dbicurules = defGetString(icurulesEl);
932 784 : if (locproviderEl && locproviderEl->arg)
933 : {
934 104 : char *locproviderstr = defGetString(locproviderEl);
935 :
936 104 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
937 32 : dblocprovider = COLLPROVIDER_BUILTIN;
938 72 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
939 14 : dblocprovider = COLLPROVIDER_ICU;
940 58 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
941 56 : dblocprovider = COLLPROVIDER_LIBC;
942 : else
943 2 : ereport(ERROR,
944 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
945 : errmsg("unrecognized locale provider: %s",
946 : locproviderstr)));
947 : }
948 782 : if (istemplateEl && istemplateEl->arg)
949 100 : dbistemplate = defGetBoolean(istemplateEl);
950 782 : if (allowconnectionsEl && allowconnectionsEl->arg)
951 98 : dballowconnections = defGetBoolean(allowconnectionsEl);
952 782 : if (connlimitEl && connlimitEl->arg)
953 : {
954 0 : dbconnlimit = defGetInt32(connlimitEl);
955 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
956 0 : ereport(ERROR,
957 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958 : errmsg("invalid connection limit: %d", dbconnlimit)));
959 : }
960 782 : if (collversionEl)
961 48 : dbcollversion = defGetString(collversionEl);
962 :
963 : /* obtain OID of proposed owner */
964 782 : if (dbowner)
965 2 : datdba = get_role_oid(dbowner, false);
966 : else
967 780 : datdba = GetUserId();
968 :
969 : /*
970 : * To create a database, must have createdb privilege and must be able to
971 : * become the target role (this does not imply that the target role itself
972 : * must have createdb privilege). The latter provision guards against
973 : * "giveaway" attacks. Note that a superuser will always have both of
974 : * these privileges a fortiori.
975 : */
976 782 : if (!have_createdb_privilege())
977 6 : ereport(ERROR,
978 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
979 : errmsg("permission denied to create database")));
980 :
981 776 : check_can_set_role(GetUserId(), datdba);
982 :
983 : /*
984 : * Lookup database (template) to be cloned, and obtain share lock on it.
985 : * ShareLock allows two CREATE DATABASEs to work from the same template
986 : * concurrently, while ensuring no one is busy dropping it in parallel
987 : * (which would be Very Bad since we'd likely get an incomplete copy
988 : * without knowing it). This also prevents any new connections from being
989 : * made to the source until we finish copying it, so we can be sure it
990 : * won't change underneath us.
991 : */
992 776 : if (!dbtemplate)
993 436 : dbtemplate = "template1"; /* Default template database name */
994 :
995 776 : if (!get_db_info(dbtemplate, ShareLock,
996 : &src_dboid, &src_owner, &src_encoding,
997 : &src_istemplate, &src_allowconn, &src_hasloginevt,
998 : &src_frozenxid, &src_minmxid, &src_deftablespace,
999 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1000 : &src_collversion))
1001 0 : ereport(ERROR,
1002 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1003 : errmsg("template database \"%s\" does not exist",
1004 : dbtemplate)));
1005 :
1006 : /*
1007 : * If the source database was in the process of being dropped, we can't
1008 : * use it as a template.
1009 : */
1010 776 : if (database_is_invalid_oid(src_dboid))
1011 2 : ereport(ERROR,
1012 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1013 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1014 : errhint("Use DROP DATABASE to drop invalid databases."));
1015 :
1016 : /*
1017 : * Permission check: to copy a DB that's not marked datistemplate, you
1018 : * must be superuser or the owner thereof.
1019 : */
1020 774 : if (!src_istemplate)
1021 : {
1022 24 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1023 0 : ereport(ERROR,
1024 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1025 : errmsg("permission denied to copy database \"%s\"",
1026 : dbtemplate)));
1027 : }
1028 :
1029 : /* Validate the database creation strategy. */
1030 774 : if (strategyEl && strategyEl->arg)
1031 : {
1032 : char *strategy;
1033 :
1034 284 : strategy = defGetString(strategyEl);
1035 284 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1036 22 : dbstrategy = CREATEDB_WAL_LOG;
1037 262 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1038 260 : dbstrategy = CREATEDB_FILE_COPY;
1039 : else
1040 2 : ereport(ERROR,
1041 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1042 : errmsg("invalid create database strategy \"%s\"", strategy),
1043 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1044 : }
1045 :
1046 : /* If encoding or locales are defaulted, use source's setting */
1047 772 : if (encoding < 0)
1048 676 : encoding = src_encoding;
1049 772 : if (dbcollate == NULL)
1050 662 : dbcollate = src_collate;
1051 772 : if (dbctype == NULL)
1052 662 : dbctype = src_ctype;
1053 772 : if (dblocprovider == '\0')
1054 670 : dblocprovider = src_locprovider;
1055 772 : if (dblocale == NULL)
1056 666 : dblocale = src_locale;
1057 772 : if (dbicurules == NULL)
1058 770 : dbicurules = src_icurules;
1059 :
1060 : /* Some encodings are client only */
1061 772 : if (!PG_VALID_BE_ENCODING(encoding))
1062 0 : ereport(ERROR,
1063 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1064 : errmsg("invalid server encoding %d", encoding)));
1065 :
1066 : /* Check that the chosen locales are valid, and get canonical spellings */
1067 772 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1068 : {
1069 2 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1070 0 : ereport(ERROR,
1071 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1072 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1073 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1074 2 : else if (dblocprovider == COLLPROVIDER_ICU)
1075 0 : ereport(ERROR,
1076 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1077 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1078 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1079 : else
1080 2 : ereport(ERROR,
1081 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1082 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
1083 : }
1084 770 : dbcollate = canonname;
1085 770 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1086 : {
1087 2 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1088 0 : ereport(ERROR,
1089 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1090 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1091 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1092 2 : else if (dblocprovider == COLLPROVIDER_ICU)
1093 0 : ereport(ERROR,
1094 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1095 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1096 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1097 : else
1098 2 : ereport(ERROR,
1099 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1100 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
1101 : }
1102 :
1103 768 : dbctype = canonname;
1104 :
1105 768 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1106 :
1107 : /* validate provider-specific parameters */
1108 768 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1109 : {
1110 704 : if (builtinlocaleEl)
1111 0 : ereport(ERROR,
1112 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1113 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1114 : }
1115 :
1116 768 : if (dblocprovider != COLLPROVIDER_ICU)
1117 : {
1118 740 : if (iculocaleEl)
1119 2 : ereport(ERROR,
1120 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1121 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1122 :
1123 738 : if (dbicurules)
1124 2 : ereport(ERROR,
1125 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1126 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1127 : }
1128 :
1129 : /* validate and canonicalize locale for the provider */
1130 764 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1131 : {
1132 : /*
1133 : * This would happen if template0 uses the libc provider but the new
1134 : * database uses builtin.
1135 : */
1136 60 : if (!dblocale)
1137 2 : ereport(ERROR,
1138 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1139 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1140 :
1141 58 : dblocale = builtin_validate_locale(encoding, dblocale);
1142 : }
1143 704 : else if (dblocprovider == COLLPROVIDER_ICU)
1144 : {
1145 28 : if (!(is_encoding_supported_by_icu(encoding)))
1146 2 : ereport(ERROR,
1147 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1148 : errmsg("encoding \"%s\" is not supported with ICU provider",
1149 : pg_encoding_to_char(encoding))));
1150 :
1151 : /*
1152 : * This would happen if template0 uses the libc provider but the new
1153 : * database uses icu.
1154 : */
1155 26 : if (!dblocale)
1156 2 : ereport(ERROR,
1157 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1158 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1159 :
1160 : /*
1161 : * During binary upgrade, or when the locale came from the template
1162 : * database, preserve locale string. Otherwise, canonicalize to a
1163 : * language tag.
1164 : */
1165 24 : if (!IsBinaryUpgrade && dblocale != src_locale)
1166 : {
1167 12 : char *langtag = icu_language_tag(dblocale,
1168 : icu_validation_level);
1169 :
1170 12 : if (langtag && strcmp(dblocale, langtag) != 0)
1171 : {
1172 4 : ereport(NOTICE,
1173 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1174 : langtag, dblocale)));
1175 :
1176 4 : dblocale = langtag;
1177 : }
1178 : }
1179 :
1180 24 : icu_validate_locale(dblocale);
1181 : }
1182 :
1183 : /* for libc, locale comes from datcollate and datctype */
1184 754 : if (dblocprovider == COLLPROVIDER_LIBC)
1185 676 : dblocale = NULL;
1186 :
1187 : /*
1188 : * Check that the new encoding and locale settings match the source
1189 : * database. We insist on this because we simply copy the source data ---
1190 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1191 : * according to the source locale would be wrong.
1192 : *
1193 : * However, we assume that template0 doesn't contain any non-ASCII data
1194 : * nor any indexes that depend on collation or ctype, so template0 can be
1195 : * used as template for creating a database with any encoding or locale.
1196 : */
1197 754 : if (strcmp(dbtemplate, "template0") != 0)
1198 : {
1199 462 : if (encoding != src_encoding)
1200 0 : ereport(ERROR,
1201 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1202 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1203 : pg_encoding_to_char(encoding),
1204 : pg_encoding_to_char(src_encoding)),
1205 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1206 :
1207 462 : if (strcmp(dbcollate, src_collate) != 0)
1208 2 : ereport(ERROR,
1209 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1210 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1211 : dbcollate, src_collate),
1212 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1213 :
1214 460 : if (strcmp(dbctype, src_ctype) != 0)
1215 0 : ereport(ERROR,
1216 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1217 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1218 : dbctype, src_ctype),
1219 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1220 :
1221 460 : if (dblocprovider != src_locprovider)
1222 0 : ereport(ERROR,
1223 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1224 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1225 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1226 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1227 :
1228 460 : if (dblocprovider == COLLPROVIDER_ICU)
1229 : {
1230 : char *val1;
1231 : char *val2;
1232 :
1233 : Assert(dblocale);
1234 : Assert(src_locale);
1235 12 : if (strcmp(dblocale, src_locale) != 0)
1236 0 : ereport(ERROR,
1237 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1238 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1239 : dblocale, src_locale),
1240 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1241 :
1242 12 : val1 = dbicurules;
1243 12 : if (!val1)
1244 12 : val1 = "";
1245 12 : val2 = src_icurules;
1246 12 : if (!val2)
1247 12 : val2 = "";
1248 12 : if (strcmp(val1, val2) != 0)
1249 0 : ereport(ERROR,
1250 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1251 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1252 : val1, val2),
1253 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1254 : }
1255 : }
1256 :
1257 : /*
1258 : * If we got a collation version for the template database, check that it
1259 : * matches the actual OS collation version. Otherwise error; the user
1260 : * needs to fix the template database first. Don't complain if a
1261 : * collation version was specified explicitly as a statement option; that
1262 : * is used by pg_upgrade to reproduce the old state exactly.
1263 : *
1264 : * (If the template database has no collation version, then either the
1265 : * platform/provider does not support collation versioning, or it's
1266 : * template0, for which we stipulate that it does not contain
1267 : * collation-using objects.)
1268 : */
1269 752 : if (src_collversion && !collversionEl)
1270 : {
1271 : char *actual_versionstr;
1272 : const char *locale;
1273 :
1274 302 : if (dblocprovider == COLLPROVIDER_LIBC)
1275 278 : locale = dbcollate;
1276 : else
1277 24 : locale = dblocale;
1278 :
1279 302 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1280 302 : if (!actual_versionstr)
1281 0 : ereport(ERROR,
1282 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1283 : dbtemplate)));
1284 :
1285 302 : if (strcmp(actual_versionstr, src_collversion) != 0)
1286 0 : ereport(ERROR,
1287 : (errmsg("template database \"%s\" has a collation version mismatch",
1288 : dbtemplate),
1289 : errdetail("The template database was created using collation version %s, "
1290 : "but the operating system provides version %s.",
1291 : src_collversion, actual_versionstr),
1292 : errhint("Rebuild all objects in the template database that use the default collation and run "
1293 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1294 : "or build PostgreSQL with the right library version.",
1295 : quote_identifier(dbtemplate))));
1296 : }
1297 :
1298 752 : if (dbcollversion == NULL)
1299 704 : dbcollversion = src_collversion;
1300 :
1301 : /*
1302 : * Normally, we copy the collation version from the template database.
1303 : * This last resort only applies if the template database does not have a
1304 : * collation version, which is normally only the case for template0.
1305 : */
1306 752 : if (dbcollversion == NULL)
1307 : {
1308 : const char *locale;
1309 :
1310 402 : if (dblocprovider == COLLPROVIDER_LIBC)
1311 362 : locale = dbcollate;
1312 : else
1313 40 : locale = dblocale;
1314 :
1315 402 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1316 : }
1317 :
1318 : /* Resolve default tablespace for new database */
1319 752 : if (tablespacenameEl && tablespacenameEl->arg)
1320 16 : {
1321 : char *tablespacename;
1322 : AclResult aclresult;
1323 :
1324 16 : tablespacename = defGetString(tablespacenameEl);
1325 16 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1326 : /* check permissions */
1327 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1328 : ACL_CREATE);
1329 16 : if (aclresult != ACLCHECK_OK)
1330 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1331 : tablespacename);
1332 :
1333 : /* pg_global must never be the default tablespace */
1334 16 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1335 0 : ereport(ERROR,
1336 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337 : errmsg("pg_global cannot be used as default tablespace")));
1338 :
1339 : /*
1340 : * If we are trying to change the default tablespace of the template,
1341 : * we require that the template not have any files in the new default
1342 : * tablespace. This is necessary because otherwise the copied
1343 : * database would contain pg_class rows that refer to its default
1344 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1345 : * would cause problems. For example another CREATE DATABASE using
1346 : * the copied database as template, and trying to change its default
1347 : * tablespace again, would yield outright incorrect results (it would
1348 : * improperly move tables to the new default tablespace that should
1349 : * stay in the same tablespace).
1350 : */
1351 16 : if (dst_deftablespace != src_deftablespace)
1352 : {
1353 : char *srcpath;
1354 : struct stat st;
1355 :
1356 16 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1357 :
1358 16 : if (stat(srcpath, &st) == 0 &&
1359 0 : S_ISDIR(st.st_mode) &&
1360 0 : !directory_is_empty(srcpath))
1361 0 : ereport(ERROR,
1362 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1363 : errmsg("cannot assign new default tablespace \"%s\"",
1364 : tablespacename),
1365 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1366 : dbtemplate)));
1367 16 : pfree(srcpath);
1368 : }
1369 : }
1370 : else
1371 : {
1372 : /* Use template database's default tablespace */
1373 736 : dst_deftablespace = src_deftablespace;
1374 : /* Note there is no additional permission check in this path */
1375 : }
1376 :
1377 : /*
1378 : * If built with appropriate switch, whine when regression-testing
1379 : * conventions for database names are violated. But don't complain during
1380 : * initdb.
1381 : */
1382 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1383 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1384 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1385 : #endif
1386 :
1387 : /*
1388 : * Check for db name conflict. This is just to give a more friendly error
1389 : * message than "unique index violation". There's a race condition but
1390 : * we're willing to accept the less friendly message in that case.
1391 : */
1392 752 : if (OidIsValid(get_database_oid(dbname, true)))
1393 2 : ereport(ERROR,
1394 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1395 : errmsg("database \"%s\" already exists", dbname)));
1396 :
1397 : /*
1398 : * The source DB can't have any active backends, except this one
1399 : * (exception is to allow CREATE DB while connected to template1).
1400 : * Otherwise we might copy inconsistent data.
1401 : *
1402 : * This should be last among the basic error checks, because it involves
1403 : * potential waiting; we may as well throw an error first if we're gonna
1404 : * throw one.
1405 : */
1406 750 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1407 0 : ereport(ERROR,
1408 : (errcode(ERRCODE_OBJECT_IN_USE),
1409 : errmsg("source database \"%s\" is being accessed by other users",
1410 : dbtemplate),
1411 : errdetail_busy_db(notherbackends, npreparedxacts)));
1412 :
1413 : /*
1414 : * Select an OID for the new database, checking that it doesn't have a
1415 : * filename conflict with anything already existing in the tablespace
1416 : * directories.
1417 : */
1418 750 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1419 :
1420 : /*
1421 : * If database OID is configured, check if the OID is already in use or
1422 : * data directory already exists.
1423 : */
1424 750 : if (OidIsValid(dboid))
1425 : {
1426 250 : char *existing_dbname = get_database_name(dboid);
1427 :
1428 250 : if (existing_dbname != NULL)
1429 0 : ereport(ERROR,
1430 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1431 : errmsg("database OID %u is already in use by database \"%s\"",
1432 : dboid, existing_dbname));
1433 :
1434 250 : if (check_db_file_conflict(dboid))
1435 0 : ereport(ERROR,
1436 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1437 : errmsg("data directory with the specified OID %u already exists", dboid));
1438 : }
1439 : else
1440 : {
1441 : /* Select an OID for the new database if is not explicitly configured. */
1442 : do
1443 : {
1444 500 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1445 : Anum_pg_database_oid);
1446 500 : } while (check_db_file_conflict(dboid));
1447 : }
1448 :
1449 : /*
1450 : * Insert a new tuple into pg_database. This establishes our ownership of
1451 : * the new database name (anyone else trying to insert the same name will
1452 : * block on the unique index, and fail after we commit).
1453 : */
1454 :
1455 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1456 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1457 :
1458 : /* Form tuple */
1459 750 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1460 750 : new_record[Anum_pg_database_datname - 1] =
1461 750 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1462 750 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1463 750 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1464 750 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1465 750 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1466 750 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1467 750 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1468 750 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1469 750 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1470 750 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1471 750 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1472 750 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1473 750 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1474 750 : if (dblocale)
1475 76 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1476 : else
1477 674 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1478 750 : if (dbicurules)
1479 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1480 : else
1481 750 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1482 750 : if (dbcollversion)
1483 630 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1484 : else
1485 120 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1486 :
1487 : /*
1488 : * We deliberately set datacl to default (NULL), rather than copying it
1489 : * from the template database. Copying it would be a bad idea when the
1490 : * owner is not the same as the template's owner.
1491 : */
1492 750 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1493 :
1494 750 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1495 : new_record, new_record_nulls);
1496 :
1497 750 : CatalogTupleInsert(pg_database_rel, tuple);
1498 :
1499 : /*
1500 : * Now generate additional catalog entries associated with the new DB
1501 : */
1502 :
1503 : /* Register owner dependency */
1504 750 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1505 :
1506 : /* Create pg_shdepend entries for objects within database */
1507 750 : copyTemplateDependencies(src_dboid, dboid);
1508 :
1509 : /* Post creation hook for new database */
1510 750 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1511 :
1512 : /*
1513 : * If we're going to be reading data for the to-be-created database into
1514 : * shared_buffers, take a lock on it. Nobody should know that this
1515 : * database exists yet, but it's good to maintain the invariant that an
1516 : * AccessExclusiveLock on the database is sufficient to drop all of its
1517 : * buffers without worrying about more being read later.
1518 : *
1519 : * Note that we need to do this before entering the
1520 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1521 : * expects this lock to be held already.
1522 : */
1523 750 : if (dbstrategy == CREATEDB_WAL_LOG)
1524 490 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1525 :
1526 : /*
1527 : * Once we start copying subdirectories, we need to be able to clean 'em
1528 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1529 : * is not a 100% solution, because of the possibility of failure during
1530 : * transaction commit after we leave this routine, but it should handle
1531 : * most scenarios.)
1532 : */
1533 750 : fparms.src_dboid = src_dboid;
1534 750 : fparms.dest_dboid = dboid;
1535 750 : fparms.strategy = dbstrategy;
1536 :
1537 750 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1538 : PointerGetDatum(&fparms));
1539 : {
1540 : /*
1541 : * If the user has asked to create a database with WAL_LOG strategy
1542 : * then call CreateDatabaseUsingWalLog, which will copy the database
1543 : * at the block level and it will WAL log each copied block.
1544 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1545 : * database file by file.
1546 : */
1547 750 : if (dbstrategy == CREATEDB_WAL_LOG)
1548 490 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1549 : dst_deftablespace);
1550 : else
1551 260 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1552 : dst_deftablespace);
1553 :
1554 : /*
1555 : * Close pg_database, but keep lock till commit.
1556 : */
1557 746 : table_close(pg_database_rel, NoLock);
1558 :
1559 : /*
1560 : * Force synchronous commit, thus minimizing the window between
1561 : * creation of the database files and committal of the transaction. If
1562 : * we crash before committing, we'll have a DB that's taking up disk
1563 : * space but is not in pg_database, which is not good.
1564 : */
1565 746 : ForceSyncCommit();
1566 : }
1567 750 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1568 : PointerGetDatum(&fparms));
1569 :
1570 746 : return dboid;
1571 : }
1572 :
1573 : /*
1574 : * Check whether chosen encoding matches chosen locale settings. This
1575 : * restriction is necessary because libc's locale-specific code usually
1576 : * fails when presented with data in an encoding it's not expecting. We
1577 : * allow mismatch in four cases:
1578 : *
1579 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1580 : * which works with any encoding.
1581 : *
1582 : * 2. locale encoding = -1, which means that we couldn't determine the
1583 : * locale's encoding and have to trust the user to get it right.
1584 : *
1585 : * 3. selected encoding is UTF8 and platform is win32. This is because
1586 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1587 : * converted to UTF16 before being used.
1588 : *
1589 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1590 : * is risky but we have historically allowed it --- notably, the
1591 : * regression tests require it.
1592 : *
1593 : * Note: if you change this policy, fix initdb to match.
1594 : */
1595 : void
1596 796 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1597 : {
1598 796 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1599 796 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1600 :
1601 804 : if (!(ctype_encoding == encoding ||
1602 8 : ctype_encoding == PG_SQL_ASCII ||
1603 : ctype_encoding == -1 ||
1604 : #ifdef WIN32
1605 : encoding == PG_UTF8 ||
1606 : #endif
1607 8 : (encoding == PG_SQL_ASCII && superuser())))
1608 0 : ereport(ERROR,
1609 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1610 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1611 : pg_encoding_to_char(encoding),
1612 : ctype),
1613 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1614 : pg_encoding_to_char(ctype_encoding))));
1615 :
1616 804 : if (!(collate_encoding == encoding ||
1617 8 : collate_encoding == PG_SQL_ASCII ||
1618 : collate_encoding == -1 ||
1619 : #ifdef WIN32
1620 : encoding == PG_UTF8 ||
1621 : #endif
1622 8 : (encoding == PG_SQL_ASCII && superuser())))
1623 0 : ereport(ERROR,
1624 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1625 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1626 : pg_encoding_to_char(encoding),
1627 : collate),
1628 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1629 : pg_encoding_to_char(collate_encoding))));
1630 796 : }
1631 :
1632 : /* Error cleanup callback for createdb */
1633 : static void
1634 4 : createdb_failure_callback(int code, Datum arg)
1635 : {
1636 4 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1637 :
1638 : /*
1639 : * If we were copying database at block levels then drop pages for the
1640 : * destination database that are in the shared buffer cache. And tell
1641 : * checkpointer to forget any pending fsync and unlink requests for files
1642 : * in the database. The reasoning behind doing this is same as explained
1643 : * in dropdb function. But unlike dropdb we don't need to call
1644 : * pgstat_drop_database because this database is still not created so
1645 : * there should not be any stat for this.
1646 : */
1647 4 : if (fparms->strategy == CREATEDB_WAL_LOG)
1648 : {
1649 4 : DropDatabaseBuffers(fparms->dest_dboid);
1650 4 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1651 :
1652 : /* Release lock on the target database. */
1653 4 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1654 : AccessShareLock);
1655 : }
1656 :
1657 : /*
1658 : * Release lock on source database before doing recursive remove. This is
1659 : * not essential but it seems desirable to release the lock as soon as
1660 : * possible.
1661 : */
1662 4 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1663 :
1664 : /* Throw away any successfully copied subdirectories */
1665 4 : remove_dbtablespaces(fparms->dest_dboid);
1666 4 : }
1667 :
1668 :
1669 : /*
1670 : * DROP DATABASE
1671 : */
1672 : void
1673 122 : dropdb(const char *dbname, bool missing_ok, bool force)
1674 : {
1675 : Oid db_id;
1676 : bool db_istemplate;
1677 : Relation pgdbrel;
1678 : HeapTuple tup;
1679 : ScanKeyData scankey;
1680 : void *inplace_state;
1681 : Form_pg_database datform;
1682 : int notherbackends;
1683 : int npreparedxacts;
1684 : int nslots,
1685 : nslots_active;
1686 : int nsubscriptions;
1687 :
1688 : /*
1689 : * Look up the target database's OID, and get exclusive lock on it. We
1690 : * need this to ensure that no new backend starts up in the target
1691 : * database while we are deleting it (see postinit.c), and that no one is
1692 : * using it as a CREATE DATABASE template or trying to delete it for
1693 : * themselves.
1694 : */
1695 122 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1696 :
1697 122 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1698 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1699 : {
1700 32 : if (!missing_ok)
1701 : {
1702 16 : ereport(ERROR,
1703 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1704 : errmsg("database \"%s\" does not exist", dbname)));
1705 : }
1706 : else
1707 : {
1708 : /* Close pg_database, release the lock, since we changed nothing */
1709 16 : table_close(pgdbrel, RowExclusiveLock);
1710 16 : ereport(NOTICE,
1711 : (errmsg("database \"%s\" does not exist, skipping",
1712 : dbname)));
1713 16 : return;
1714 : }
1715 : }
1716 :
1717 : /*
1718 : * Permission checks
1719 : */
1720 90 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1721 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1722 : dbname);
1723 :
1724 : /* DROP hook for the database being removed */
1725 90 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1726 :
1727 : /*
1728 : * Disallow dropping a DB that is marked istemplate. This is just to
1729 : * prevent people from accidentally dropping template0 or template1; they
1730 : * can do so if they're really determined ...
1731 : */
1732 90 : if (db_istemplate)
1733 0 : ereport(ERROR,
1734 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1735 : errmsg("cannot drop a template database")));
1736 :
1737 : /* Obviously can't drop my own database */
1738 90 : if (db_id == MyDatabaseId)
1739 0 : ereport(ERROR,
1740 : (errcode(ERRCODE_OBJECT_IN_USE),
1741 : errmsg("cannot drop the currently open database")));
1742 :
1743 : /*
1744 : * Check whether there are active logical slots that refer to the
1745 : * to-be-dropped database. The database lock we are holding prevents the
1746 : * creation of new slots using the database or existing slots becoming
1747 : * active.
1748 : */
1749 90 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1750 90 : if (nslots_active)
1751 : {
1752 2 : ereport(ERROR,
1753 : (errcode(ERRCODE_OBJECT_IN_USE),
1754 : errmsg("database \"%s\" is used by an active logical replication slot",
1755 : dbname),
1756 : errdetail_plural("There is %d active slot.",
1757 : "There are %d active slots.",
1758 : nslots_active, nslots_active)));
1759 : }
1760 :
1761 : /*
1762 : * Check if there are subscriptions defined in the target database.
1763 : *
1764 : * We can't drop them automatically because they might be holding
1765 : * resources in other databases/instances.
1766 : */
1767 88 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1768 0 : ereport(ERROR,
1769 : (errcode(ERRCODE_OBJECT_IN_USE),
1770 : errmsg("database \"%s\" is being used by logical replication subscription",
1771 : dbname),
1772 : errdetail_plural("There is %d subscription.",
1773 : "There are %d subscriptions.",
1774 : nsubscriptions, nsubscriptions)));
1775 :
1776 :
1777 : /*
1778 : * Attempt to terminate all existing connections to the target database if
1779 : * the user has requested to do so.
1780 : */
1781 88 : if (force)
1782 2 : TerminateOtherDBBackends(db_id);
1783 :
1784 : /*
1785 : * Check for other backends in the target database. (Because we hold the
1786 : * database lock, no new ones can start after this.)
1787 : *
1788 : * As in CREATE DATABASE, check this after other error conditions.
1789 : */
1790 88 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1791 0 : ereport(ERROR,
1792 : (errcode(ERRCODE_OBJECT_IN_USE),
1793 : errmsg("database \"%s\" is being accessed by other users",
1794 : dbname),
1795 : errdetail_busy_db(notherbackends, npreparedxacts)));
1796 :
1797 : /*
1798 : * Delete any comments or security labels associated with the database.
1799 : */
1800 88 : DeleteSharedComments(db_id, DatabaseRelationId);
1801 88 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1802 :
1803 : /*
1804 : * Remove settings associated with this database
1805 : */
1806 88 : DropSetting(db_id, InvalidOid);
1807 :
1808 : /*
1809 : * Remove shared dependency references for the database.
1810 : */
1811 88 : dropDatabaseDependencies(db_id);
1812 :
1813 : /*
1814 : * Tell the cumulative stats system to forget it immediately, too.
1815 : */
1816 88 : pgstat_drop_database(db_id);
1817 :
1818 : /*
1819 : * Except for the deletion of the catalog row, subsequent actions are not
1820 : * transactional (consider DropDatabaseBuffers() discarding modified
1821 : * buffers). But we might crash or get interrupted below. To prevent
1822 : * accesses to a database with invalid contents, mark the database as
1823 : * invalid using an in-place update.
1824 : *
1825 : * We need to flush the WAL before continuing, to guarantee the
1826 : * modification is durable before performing irreversible filesystem
1827 : * operations.
1828 : */
1829 88 : ScanKeyInit(&scankey,
1830 : Anum_pg_database_datname,
1831 : BTEqualStrategyNumber, F_NAMEEQ,
1832 : CStringGetDatum(dbname));
1833 88 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1834 : NULL, 1, &scankey, &tup, &inplace_state);
1835 88 : if (!HeapTupleIsValid(tup))
1836 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1837 88 : datform = (Form_pg_database) GETSTRUCT(tup);
1838 88 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1839 88 : systable_inplace_update_finish(inplace_state, tup);
1840 88 : XLogFlush(XactLastRecEnd);
1841 :
1842 : /*
1843 : * Also delete the tuple - transactionally. If this transaction commits,
1844 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1845 : */
1846 88 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1847 88 : heap_freetuple(tup);
1848 :
1849 : /*
1850 : * Drop db-specific replication slots.
1851 : */
1852 88 : ReplicationSlotsDropDBSlots(db_id);
1853 :
1854 : /*
1855 : * Drop pages for this database that are in the shared buffer cache. This
1856 : * is important to ensure that no remaining backend tries to write out a
1857 : * dirty buffer to the dead database later...
1858 : */
1859 88 : DropDatabaseBuffers(db_id);
1860 :
1861 : /*
1862 : * Tell checkpointer to forget any pending fsync and unlink requests for
1863 : * files in the database; else the fsyncs will fail at next checkpoint, or
1864 : * worse, it will delete files that belong to a newly created database
1865 : * with the same OID.
1866 : */
1867 88 : ForgetDatabaseSyncRequests(db_id);
1868 :
1869 : /*
1870 : * Force a checkpoint to make sure the checkpointer has received the
1871 : * message sent by ForgetDatabaseSyncRequests.
1872 : */
1873 88 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1874 :
1875 : /* Close all smgr fds in all backends. */
1876 88 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1877 :
1878 : /*
1879 : * Remove all tablespace subdirs belonging to the database.
1880 : */
1881 88 : remove_dbtablespaces(db_id);
1882 :
1883 : /*
1884 : * Close pg_database, but keep lock till commit.
1885 : */
1886 88 : table_close(pgdbrel, NoLock);
1887 :
1888 : /*
1889 : * Force synchronous commit, thus minimizing the window between removal of
1890 : * the database files and committal of the transaction. If we crash before
1891 : * committing, we'll have a DB that's gone on disk but still there
1892 : * according to pg_database, which is not good.
1893 : */
1894 88 : ForceSyncCommit();
1895 : }
1896 :
1897 :
1898 : /*
1899 : * Rename database
1900 : */
1901 : ObjectAddress
1902 6 : RenameDatabase(const char *oldname, const char *newname)
1903 : {
1904 : Oid db_id;
1905 : HeapTuple newtup;
1906 : ItemPointerData otid;
1907 : Relation rel;
1908 : int notherbackends;
1909 : int npreparedxacts;
1910 : ObjectAddress address;
1911 :
1912 : /*
1913 : * Look up the target database's OID, and get exclusive lock on it. We
1914 : * need this for the same reasons as DROP DATABASE.
1915 : */
1916 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1917 :
1918 6 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1919 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1920 0 : ereport(ERROR,
1921 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1922 : errmsg("database \"%s\" does not exist", oldname)));
1923 :
1924 : /* must be owner */
1925 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1926 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1927 : oldname);
1928 :
1929 : /* must have createdb rights */
1930 6 : if (!have_createdb_privilege())
1931 0 : ereport(ERROR,
1932 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1933 : errmsg("permission denied to rename database")));
1934 :
1935 : /*
1936 : * If built with appropriate switch, whine when regression-testing
1937 : * conventions for database names are violated.
1938 : */
1939 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1940 : if (strstr(newname, "regression") == NULL)
1941 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1942 : #endif
1943 :
1944 : /*
1945 : * Make sure the new name doesn't exist. See notes for same error in
1946 : * CREATE DATABASE.
1947 : */
1948 6 : if (OidIsValid(get_database_oid(newname, true)))
1949 0 : ereport(ERROR,
1950 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1951 : errmsg("database \"%s\" already exists", newname)));
1952 :
1953 : /*
1954 : * XXX Client applications probably store the current database somewhere,
1955 : * so renaming it could cause confusion. On the other hand, there may not
1956 : * be an actual problem besides a little confusion, so think about this
1957 : * and decide.
1958 : */
1959 6 : if (db_id == MyDatabaseId)
1960 0 : ereport(ERROR,
1961 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1962 : errmsg("current database cannot be renamed")));
1963 :
1964 : /*
1965 : * Make sure the database does not have active sessions. This is the same
1966 : * concern as above, but applied to other sessions.
1967 : *
1968 : * As in CREATE DATABASE, check this after other error conditions.
1969 : */
1970 6 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1971 0 : ereport(ERROR,
1972 : (errcode(ERRCODE_OBJECT_IN_USE),
1973 : errmsg("database \"%s\" is being accessed by other users",
1974 : oldname),
1975 : errdetail_busy_db(notherbackends, npreparedxacts)));
1976 :
1977 : /* rename */
1978 6 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1979 6 : if (!HeapTupleIsValid(newtup))
1980 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1981 6 : otid = newtup->t_self;
1982 6 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1983 6 : CatalogTupleUpdate(rel, &otid, newtup);
1984 6 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1985 :
1986 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1987 :
1988 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1989 :
1990 : /*
1991 : * Close pg_database, but keep lock till commit.
1992 : */
1993 6 : table_close(rel, NoLock);
1994 :
1995 6 : return address;
1996 : }
1997 :
1998 :
1999 : /*
2000 : * ALTER DATABASE SET TABLESPACE
2001 : */
2002 : static void
2003 16 : movedb(const char *dbname, const char *tblspcname)
2004 : {
2005 : Oid db_id;
2006 : Relation pgdbrel;
2007 : int notherbackends;
2008 : int npreparedxacts;
2009 : HeapTuple oldtuple,
2010 : newtuple;
2011 : Oid src_tblspcoid,
2012 : dst_tblspcoid;
2013 : ScanKeyData scankey;
2014 : SysScanDesc sysscan;
2015 : AclResult aclresult;
2016 : char *src_dbpath;
2017 : char *dst_dbpath;
2018 : DIR *dstdir;
2019 : struct dirent *xlde;
2020 : movedb_failure_params fparms;
2021 :
2022 : /*
2023 : * Look up the target database's OID, and get exclusive lock on it. We
2024 : * need this to ensure that no new backend starts up in the database while
2025 : * we are moving it, and that no one is using it as a CREATE DATABASE
2026 : * template or trying to delete it.
2027 : */
2028 16 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2029 :
2030 16 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2031 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2032 0 : ereport(ERROR,
2033 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2034 : errmsg("database \"%s\" does not exist", dbname)));
2035 :
2036 : /*
2037 : * We actually need a session lock, so that the lock will persist across
2038 : * the commit/restart below. (We could almost get away with letting the
2039 : * lock be released at commit, except that someone could try to move
2040 : * relations of the DB back into the old directory while we rmtree() it.)
2041 : */
2042 16 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2043 : AccessExclusiveLock);
2044 :
2045 : /*
2046 : * Permission checks
2047 : */
2048 16 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2049 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2050 : dbname);
2051 :
2052 : /*
2053 : * Obviously can't move the tables of my own database
2054 : */
2055 16 : if (db_id == MyDatabaseId)
2056 0 : ereport(ERROR,
2057 : (errcode(ERRCODE_OBJECT_IN_USE),
2058 : errmsg("cannot change the tablespace of the currently open database")));
2059 :
2060 : /*
2061 : * Get tablespace's oid
2062 : */
2063 16 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2064 :
2065 : /*
2066 : * Permission checks
2067 : */
2068 16 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2069 : ACL_CREATE);
2070 16 : if (aclresult != ACLCHECK_OK)
2071 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2072 : tblspcname);
2073 :
2074 : /*
2075 : * pg_global must never be the default tablespace
2076 : */
2077 16 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2078 0 : ereport(ERROR,
2079 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2080 : errmsg("pg_global cannot be used as default tablespace")));
2081 :
2082 : /*
2083 : * No-op if same tablespace
2084 : */
2085 16 : if (src_tblspcoid == dst_tblspcoid)
2086 : {
2087 0 : table_close(pgdbrel, NoLock);
2088 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2089 : AccessExclusiveLock);
2090 0 : return;
2091 : }
2092 :
2093 : /*
2094 : * Check for other backends in the target database. (Because we hold the
2095 : * database lock, no new ones can start after this.)
2096 : *
2097 : * As in CREATE DATABASE, check this after other error conditions.
2098 : */
2099 16 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2100 0 : ereport(ERROR,
2101 : (errcode(ERRCODE_OBJECT_IN_USE),
2102 : errmsg("database \"%s\" is being accessed by other users",
2103 : dbname),
2104 : errdetail_busy_db(notherbackends, npreparedxacts)));
2105 :
2106 : /*
2107 : * Get old and new database paths
2108 : */
2109 16 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2110 16 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2111 :
2112 : /*
2113 : * Force a checkpoint before proceeding. This will force all dirty
2114 : * buffers, including those of unlogged tables, out to disk, to ensure
2115 : * source database is up-to-date on disk for the copy.
2116 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2117 : * process any pending unlink requests. Otherwise, the check for existing
2118 : * files in the target directory might fail unnecessarily, not to mention
2119 : * that the copy might fail due to source files getting deleted under it.
2120 : * On Windows, this also ensures that background procs don't hold any open
2121 : * files, which would cause rmdir() to fail.
2122 : */
2123 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2124 : | CHECKPOINT_FLUSH_ALL);
2125 :
2126 : /* Close all smgr fds in all backends. */
2127 16 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2128 :
2129 : /*
2130 : * Now drop all buffers holding data of the target database; they should
2131 : * no longer be dirty so DropDatabaseBuffers is safe.
2132 : *
2133 : * It might seem that we could just let these buffers age out of shared
2134 : * buffers naturally, since they should not get referenced anymore. The
2135 : * problem with that is that if the user later moves the database back to
2136 : * its original tablespace, any still-surviving buffers would appear to
2137 : * contain valid data again --- but they'd be missing any changes made in
2138 : * the database while it was in the new tablespace. In any case, freeing
2139 : * buffers that should never be used again seems worth the cycles.
2140 : *
2141 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2142 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2143 : */
2144 16 : DropDatabaseBuffers(db_id);
2145 :
2146 : /*
2147 : * Check for existence of files in the target directory, i.e., objects of
2148 : * this database that are already in the target tablespace. We can't
2149 : * allow the move in such a case, because we would need to change those
2150 : * relations' pg_class.reltablespace entries to zero, and we don't have
2151 : * access to the DB's pg_class to do so.
2152 : */
2153 16 : dstdir = AllocateDir(dst_dbpath);
2154 16 : if (dstdir != NULL)
2155 : {
2156 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2157 : {
2158 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2159 0 : strcmp(xlde->d_name, "..") == 0)
2160 0 : continue;
2161 :
2162 0 : ereport(ERROR,
2163 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2164 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2165 : dbname, tblspcname),
2166 : errhint("You must move them back to the database's default tablespace before using this command.")));
2167 : }
2168 :
2169 0 : FreeDir(dstdir);
2170 :
2171 : /*
2172 : * The directory exists but is empty. We must remove it before using
2173 : * the copydir function.
2174 : */
2175 0 : if (rmdir(dst_dbpath) != 0)
2176 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2177 : dst_dbpath);
2178 : }
2179 :
2180 : /*
2181 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2182 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2183 : * of the possibility of failure during transaction commit, but it should
2184 : * handle most scenarios.
2185 : */
2186 16 : fparms.dest_dboid = db_id;
2187 16 : fparms.dest_tsoid = dst_tblspcoid;
2188 16 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2189 : PointerGetDatum(&fparms));
2190 : {
2191 16 : Datum new_record[Natts_pg_database] = {0};
2192 16 : bool new_record_nulls[Natts_pg_database] = {0};
2193 16 : bool new_record_repl[Natts_pg_database] = {0};
2194 :
2195 : /*
2196 : * Copy files from the old tablespace to the new one
2197 : */
2198 16 : copydir(src_dbpath, dst_dbpath, false);
2199 :
2200 : /*
2201 : * Record the filesystem change in XLOG
2202 : */
2203 : {
2204 : xl_dbase_create_file_copy_rec xlrec;
2205 :
2206 16 : xlrec.db_id = db_id;
2207 16 : xlrec.tablespace_id = dst_tblspcoid;
2208 16 : xlrec.src_db_id = db_id;
2209 16 : xlrec.src_tablespace_id = src_tblspcoid;
2210 :
2211 16 : XLogBeginInsert();
2212 16 : XLogRegisterData(&xlrec,
2213 : sizeof(xl_dbase_create_file_copy_rec));
2214 :
2215 16 : (void) XLogInsert(RM_DBASE_ID,
2216 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2217 : }
2218 :
2219 : /*
2220 : * Update the database's pg_database tuple
2221 : */
2222 16 : ScanKeyInit(&scankey,
2223 : Anum_pg_database_datname,
2224 : BTEqualStrategyNumber, F_NAMEEQ,
2225 : CStringGetDatum(dbname));
2226 16 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2227 : NULL, 1, &scankey);
2228 16 : oldtuple = systable_getnext(sysscan);
2229 16 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2230 0 : ereport(ERROR,
2231 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2232 : errmsg("database \"%s\" does not exist", dbname)));
2233 16 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2234 :
2235 16 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2236 16 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2237 :
2238 16 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2239 : new_record,
2240 : new_record_nulls, new_record_repl);
2241 16 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2242 16 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2243 :
2244 16 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2245 :
2246 16 : systable_endscan(sysscan);
2247 :
2248 : /*
2249 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2250 : * ensure that we don't have to replay a committed
2251 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2252 : * any unlogged operations done in the new DB tablespace before the
2253 : * next checkpoint.
2254 : */
2255 16 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2256 :
2257 : /*
2258 : * Force synchronous commit, thus minimizing the window between
2259 : * copying the database files and committal of the transaction. If we
2260 : * crash before committing, we'll leave an orphaned set of files on
2261 : * disk, which is not fatal but not good either.
2262 : */
2263 16 : ForceSyncCommit();
2264 :
2265 : /*
2266 : * Close pg_database, but keep lock till commit.
2267 : */
2268 16 : table_close(pgdbrel, NoLock);
2269 : }
2270 16 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2271 : PointerGetDatum(&fparms));
2272 :
2273 : /*
2274 : * Commit the transaction so that the pg_database update is committed. If
2275 : * we crash while removing files, the database won't be corrupt, we'll
2276 : * just leave some orphaned files in the old directory.
2277 : *
2278 : * (This is OK because we know we aren't inside a transaction block.)
2279 : *
2280 : * XXX would it be safe/better to do this inside the ensure block? Not
2281 : * convinced it's a good idea; consider elog just after the transaction
2282 : * really commits.
2283 : */
2284 16 : PopActiveSnapshot();
2285 16 : CommitTransactionCommand();
2286 :
2287 : /* Start new transaction for the remaining work; don't need a snapshot */
2288 16 : StartTransactionCommand();
2289 :
2290 : /*
2291 : * Remove files from the old tablespace
2292 : */
2293 16 : if (!rmtree(src_dbpath, true))
2294 0 : ereport(WARNING,
2295 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2296 : src_dbpath)));
2297 :
2298 : /*
2299 : * Record the filesystem change in XLOG
2300 : */
2301 : {
2302 : xl_dbase_drop_rec xlrec;
2303 :
2304 16 : xlrec.db_id = db_id;
2305 16 : xlrec.ntablespaces = 1;
2306 :
2307 16 : XLogBeginInsert();
2308 16 : XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2309 16 : XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2310 :
2311 16 : (void) XLogInsert(RM_DBASE_ID,
2312 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2313 : }
2314 :
2315 : /* Now it's safe to release the database lock */
2316 16 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2317 : AccessExclusiveLock);
2318 :
2319 16 : pfree(src_dbpath);
2320 16 : pfree(dst_dbpath);
2321 : }
2322 :
2323 : /* Error cleanup callback for movedb */
2324 : static void
2325 0 : movedb_failure_callback(int code, Datum arg)
2326 : {
2327 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2328 : char *dstpath;
2329 :
2330 : /* Get rid of anything we managed to copy to the target directory */
2331 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2332 :
2333 0 : (void) rmtree(dstpath, true);
2334 :
2335 0 : pfree(dstpath);
2336 0 : }
2337 :
2338 : /*
2339 : * Process options and call dropdb function.
2340 : */
2341 : void
2342 122 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2343 : {
2344 122 : bool force = false;
2345 : ListCell *lc;
2346 :
2347 148 : foreach(lc, stmt->options)
2348 : {
2349 26 : DefElem *opt = (DefElem *) lfirst(lc);
2350 :
2351 26 : if (strcmp(opt->defname, "force") == 0)
2352 26 : force = true;
2353 : else
2354 0 : ereport(ERROR,
2355 : (errcode(ERRCODE_SYNTAX_ERROR),
2356 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2357 : parser_errposition(pstate, opt->location)));
2358 : }
2359 :
2360 122 : dropdb(stmt->dbname, stmt->missing_ok, force);
2361 104 : }
2362 :
2363 : /*
2364 : * ALTER DATABASE name ...
2365 : */
2366 : Oid
2367 76 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2368 : {
2369 : Relation rel;
2370 : Oid dboid;
2371 : HeapTuple tuple,
2372 : newtuple;
2373 : Form_pg_database datform;
2374 : ScanKeyData scankey;
2375 : SysScanDesc scan;
2376 : ListCell *option;
2377 76 : bool dbistemplate = false;
2378 76 : bool dballowconnections = true;
2379 76 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2380 76 : DefElem *distemplate = NULL;
2381 76 : DefElem *dallowconnections = NULL;
2382 76 : DefElem *dconnlimit = NULL;
2383 76 : DefElem *dtablespace = NULL;
2384 76 : Datum new_record[Natts_pg_database] = {0};
2385 76 : bool new_record_nulls[Natts_pg_database] = {0};
2386 76 : bool new_record_repl[Natts_pg_database] = {0};
2387 :
2388 : /* Extract options from the statement node tree */
2389 152 : foreach(option, stmt->options)
2390 : {
2391 76 : DefElem *defel = (DefElem *) lfirst(option);
2392 :
2393 76 : if (strcmp(defel->defname, "is_template") == 0)
2394 : {
2395 20 : if (distemplate)
2396 0 : errorConflictingDefElem(defel, pstate);
2397 20 : distemplate = defel;
2398 : }
2399 56 : else if (strcmp(defel->defname, "allow_connections") == 0)
2400 : {
2401 32 : if (dallowconnections)
2402 0 : errorConflictingDefElem(defel, pstate);
2403 32 : dallowconnections = defel;
2404 : }
2405 24 : else if (strcmp(defel->defname, "connection_limit") == 0)
2406 : {
2407 8 : if (dconnlimit)
2408 0 : errorConflictingDefElem(defel, pstate);
2409 8 : dconnlimit = defel;
2410 : }
2411 16 : else if (strcmp(defel->defname, "tablespace") == 0)
2412 : {
2413 16 : if (dtablespace)
2414 0 : errorConflictingDefElem(defel, pstate);
2415 16 : dtablespace = defel;
2416 : }
2417 : else
2418 0 : ereport(ERROR,
2419 : (errcode(ERRCODE_SYNTAX_ERROR),
2420 : errmsg("option \"%s\" not recognized", defel->defname),
2421 : parser_errposition(pstate, defel->location)));
2422 : }
2423 :
2424 76 : if (dtablespace)
2425 : {
2426 : /*
2427 : * While the SET TABLESPACE syntax doesn't allow any other options,
2428 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2429 : * options from being specified in that case.
2430 : */
2431 16 : if (list_length(stmt->options) != 1)
2432 0 : ereport(ERROR,
2433 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2434 : errmsg("option \"%s\" cannot be specified with other options",
2435 : dtablespace->defname),
2436 : parser_errposition(pstate, dtablespace->location)));
2437 : /* this case isn't allowed within a transaction block */
2438 16 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2439 16 : movedb(stmt->dbname, defGetString(dtablespace));
2440 16 : return InvalidOid;
2441 : }
2442 :
2443 60 : if (distemplate && distemplate->arg)
2444 20 : dbistemplate = defGetBoolean(distemplate);
2445 60 : if (dallowconnections && dallowconnections->arg)
2446 32 : dballowconnections = defGetBoolean(dallowconnections);
2447 60 : if (dconnlimit && dconnlimit->arg)
2448 : {
2449 8 : dbconnlimit = defGetInt32(dconnlimit);
2450 8 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2451 0 : ereport(ERROR,
2452 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2453 : errmsg("invalid connection limit: %d", dbconnlimit)));
2454 : }
2455 :
2456 : /*
2457 : * Get the old tuple. We don't need a lock on the database per se,
2458 : * because we're not going to do anything that would mess up incoming
2459 : * connections.
2460 : */
2461 60 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2462 60 : ScanKeyInit(&scankey,
2463 : Anum_pg_database_datname,
2464 : BTEqualStrategyNumber, F_NAMEEQ,
2465 60 : CStringGetDatum(stmt->dbname));
2466 60 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2467 : NULL, 1, &scankey);
2468 60 : tuple = systable_getnext(scan);
2469 60 : if (!HeapTupleIsValid(tuple))
2470 0 : ereport(ERROR,
2471 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2472 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2473 60 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2474 :
2475 60 : datform = (Form_pg_database) GETSTRUCT(tuple);
2476 60 : dboid = datform->oid;
2477 :
2478 60 : if (database_is_invalid_form(datform))
2479 : {
2480 2 : ereport(FATAL,
2481 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2482 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2483 : errhint("Use DROP DATABASE to drop invalid databases."));
2484 : }
2485 :
2486 58 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2487 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2488 0 : stmt->dbname);
2489 :
2490 : /*
2491 : * In order to avoid getting locked out and having to go through
2492 : * standalone mode, we refuse to disallow connections to the database
2493 : * we're currently connected to. Lockout can still happen with concurrent
2494 : * sessions but the likeliness of that is not high enough to worry about.
2495 : */
2496 58 : if (!dballowconnections && dboid == MyDatabaseId)
2497 0 : ereport(ERROR,
2498 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2499 : errmsg("cannot disallow connections for current database")));
2500 :
2501 : /*
2502 : * Build an updated tuple, perusing the information just obtained
2503 : */
2504 58 : if (distemplate)
2505 : {
2506 20 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2507 20 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2508 : }
2509 58 : if (dallowconnections)
2510 : {
2511 32 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2512 32 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2513 : }
2514 58 : if (dconnlimit)
2515 : {
2516 6 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2517 6 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2518 : }
2519 :
2520 58 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2521 : new_record_nulls, new_record_repl);
2522 58 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2523 58 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2524 :
2525 58 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2526 :
2527 58 : systable_endscan(scan);
2528 :
2529 : /* Close pg_database, but keep lock till commit */
2530 58 : table_close(rel, NoLock);
2531 :
2532 58 : return dboid;
2533 : }
2534 :
2535 :
2536 : /*
2537 : * ALTER DATABASE name REFRESH COLLATION VERSION
2538 : */
2539 : ObjectAddress
2540 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2541 : {
2542 : Relation rel;
2543 : ScanKeyData scankey;
2544 : SysScanDesc scan;
2545 : Oid db_id;
2546 : HeapTuple tuple;
2547 : Form_pg_database datForm;
2548 : ObjectAddress address;
2549 : Datum datum;
2550 : bool isnull;
2551 : char *oldversion;
2552 : char *newversion;
2553 :
2554 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2555 6 : ScanKeyInit(&scankey,
2556 : Anum_pg_database_datname,
2557 : BTEqualStrategyNumber, F_NAMEEQ,
2558 6 : CStringGetDatum(stmt->dbname));
2559 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2560 : NULL, 1, &scankey);
2561 6 : tuple = systable_getnext(scan);
2562 6 : if (!HeapTupleIsValid(tuple))
2563 0 : ereport(ERROR,
2564 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2565 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2566 :
2567 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2568 6 : db_id = datForm->oid;
2569 :
2570 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2571 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2572 0 : stmt->dbname);
2573 6 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2574 :
2575 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2576 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2577 :
2578 6 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2579 : {
2580 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2581 4 : if (isnull)
2582 0 : elog(ERROR, "unexpected null in pg_database");
2583 : }
2584 : else
2585 : {
2586 2 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2587 2 : if (isnull)
2588 0 : elog(ERROR, "unexpected null in pg_database");
2589 : }
2590 :
2591 6 : newversion = get_collation_actual_version(datForm->datlocprovider,
2592 6 : TextDatumGetCString(datum));
2593 :
2594 : /* cannot change from NULL to non-NULL or vice versa */
2595 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
2596 0 : elog(ERROR, "invalid collation version change");
2597 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2598 0 : {
2599 0 : bool nulls[Natts_pg_database] = {0};
2600 0 : bool replaces[Natts_pg_database] = {0};
2601 0 : Datum values[Natts_pg_database] = {0};
2602 : HeapTuple newtuple;
2603 :
2604 0 : ereport(NOTICE,
2605 : (errmsg("changing version from %s to %s",
2606 : oldversion, newversion)));
2607 :
2608 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2609 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2610 :
2611 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2612 : values, nulls, replaces);
2613 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2614 0 : heap_freetuple(newtuple);
2615 : }
2616 : else
2617 6 : ereport(NOTICE,
2618 : (errmsg("version has not changed")));
2619 6 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2620 :
2621 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2622 :
2623 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2624 :
2625 6 : systable_endscan(scan);
2626 :
2627 6 : table_close(rel, NoLock);
2628 :
2629 6 : return address;
2630 : }
2631 :
2632 :
2633 : /*
2634 : * ALTER DATABASE name SET ...
2635 : */
2636 : Oid
2637 1198 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2638 : {
2639 1198 : Oid datid = get_database_oid(stmt->dbname, false);
2640 :
2641 : /*
2642 : * Obtain a lock on the database and make sure it didn't go away in the
2643 : * meantime.
2644 : */
2645 1198 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2646 :
2647 1198 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2648 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2649 0 : stmt->dbname);
2650 :
2651 1198 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2652 :
2653 1198 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2654 :
2655 1198 : return datid;
2656 : }
2657 :
2658 :
2659 : /*
2660 : * ALTER DATABASE name OWNER TO newowner
2661 : */
2662 : ObjectAddress
2663 80 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2664 : {
2665 : Oid db_id;
2666 : HeapTuple tuple;
2667 : Relation rel;
2668 : ScanKeyData scankey;
2669 : SysScanDesc scan;
2670 : Form_pg_database datForm;
2671 : ObjectAddress address;
2672 :
2673 : /*
2674 : * Get the old tuple. We don't need a lock on the database per se,
2675 : * because we're not going to do anything that would mess up incoming
2676 : * connections.
2677 : */
2678 80 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2679 80 : ScanKeyInit(&scankey,
2680 : Anum_pg_database_datname,
2681 : BTEqualStrategyNumber, F_NAMEEQ,
2682 : CStringGetDatum(dbname));
2683 80 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2684 : NULL, 1, &scankey);
2685 80 : tuple = systable_getnext(scan);
2686 80 : if (!HeapTupleIsValid(tuple))
2687 0 : ereport(ERROR,
2688 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2689 : errmsg("database \"%s\" does not exist", dbname)));
2690 :
2691 80 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2692 80 : db_id = datForm->oid;
2693 :
2694 : /*
2695 : * If the new owner is the same as the existing owner, consider the
2696 : * command to have succeeded. This is to be consistent with other
2697 : * objects.
2698 : */
2699 80 : if (datForm->datdba != newOwnerId)
2700 : {
2701 : Datum repl_val[Natts_pg_database];
2702 30 : bool repl_null[Natts_pg_database] = {0};
2703 30 : bool repl_repl[Natts_pg_database] = {0};
2704 : Acl *newAcl;
2705 : Datum aclDatum;
2706 : bool isNull;
2707 : HeapTuple newtuple;
2708 :
2709 : /* Otherwise, must be owner of the existing object */
2710 30 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2711 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2712 : dbname);
2713 :
2714 : /* Must be able to become new owner */
2715 30 : check_can_set_role(GetUserId(), newOwnerId);
2716 :
2717 : /*
2718 : * must have createdb rights
2719 : *
2720 : * NOTE: This is different from other alter-owner checks in that the
2721 : * current user is checked for createdb privileges instead of the
2722 : * destination owner. This is consistent with the CREATE case for
2723 : * databases. Because superusers will always have this right, we need
2724 : * no special case for them.
2725 : */
2726 30 : if (!have_createdb_privilege())
2727 0 : ereport(ERROR,
2728 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2729 : errmsg("permission denied to change owner of database")));
2730 :
2731 30 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2732 :
2733 30 : repl_repl[Anum_pg_database_datdba - 1] = true;
2734 30 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2735 :
2736 : /*
2737 : * Determine the modified ACL for the new owner. This is only
2738 : * necessary when the ACL is non-null.
2739 : */
2740 30 : aclDatum = heap_getattr(tuple,
2741 : Anum_pg_database_datacl,
2742 : RelationGetDescr(rel),
2743 : &isNull);
2744 30 : if (!isNull)
2745 : {
2746 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2747 : datForm->datdba, newOwnerId);
2748 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2749 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2750 : }
2751 :
2752 30 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2753 30 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2754 30 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2755 :
2756 30 : heap_freetuple(newtuple);
2757 :
2758 : /* Update owner dependency reference */
2759 30 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2760 : }
2761 :
2762 80 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2763 :
2764 80 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2765 :
2766 80 : systable_endscan(scan);
2767 :
2768 : /* Close pg_database, but keep lock till commit */
2769 80 : table_close(rel, NoLock);
2770 :
2771 80 : return address;
2772 : }
2773 :
2774 :
2775 : Datum
2776 96 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2777 : {
2778 96 : Oid dbid = PG_GETARG_OID(0);
2779 : HeapTuple tp;
2780 : char datlocprovider;
2781 : Datum datum;
2782 : char *version;
2783 :
2784 96 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2785 96 : if (!HeapTupleIsValid(tp))
2786 0 : ereport(ERROR,
2787 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2788 : errmsg("database with OID %u does not exist", dbid)));
2789 :
2790 96 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2791 :
2792 96 : if (datlocprovider == COLLPROVIDER_LIBC)
2793 80 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2794 : else
2795 16 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2796 :
2797 96 : version = get_collation_actual_version(datlocprovider,
2798 96 : TextDatumGetCString(datum));
2799 :
2800 96 : ReleaseSysCache(tp);
2801 :
2802 96 : if (version)
2803 68 : PG_RETURN_TEXT_P(cstring_to_text(version));
2804 : else
2805 28 : PG_RETURN_NULL();
2806 : }
2807 :
2808 :
2809 : /*
2810 : * Helper functions
2811 : */
2812 :
2813 : /*
2814 : * Look up info about the database named "name". If the database exists,
2815 : * obtain the specified lock type on it, fill in any of the remaining
2816 : * parameters that aren't NULL, and return true. If no such database,
2817 : * return false.
2818 : */
2819 : static bool
2820 920 : get_db_info(const char *name, LOCKMODE lockmode,
2821 : Oid *dbIdP, Oid *ownerIdP,
2822 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2823 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2824 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2825 : char **dbIcurules,
2826 : char *dbLocProvider,
2827 : char **dbCollversion)
2828 : {
2829 920 : bool result = false;
2830 : Relation relation;
2831 :
2832 : Assert(name);
2833 :
2834 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2835 920 : relation = table_open(DatabaseRelationId, AccessShareLock);
2836 :
2837 : /*
2838 : * Loop covers the rare case where the database is renamed before we can
2839 : * lock it. We try again just in case we can find a new one of the same
2840 : * name.
2841 : */
2842 : for (;;)
2843 0 : {
2844 : ScanKeyData scanKey;
2845 : SysScanDesc scan;
2846 : HeapTuple tuple;
2847 : Oid dbOid;
2848 :
2849 : /*
2850 : * there's no syscache for database-indexed-by-name, so must do it the
2851 : * hard way
2852 : */
2853 920 : ScanKeyInit(&scanKey,
2854 : Anum_pg_database_datname,
2855 : BTEqualStrategyNumber, F_NAMEEQ,
2856 : CStringGetDatum(name));
2857 :
2858 920 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2859 : NULL, 1, &scanKey);
2860 :
2861 920 : tuple = systable_getnext(scan);
2862 :
2863 920 : if (!HeapTupleIsValid(tuple))
2864 : {
2865 : /* definitely no database of that name */
2866 32 : systable_endscan(scan);
2867 32 : break;
2868 : }
2869 :
2870 888 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2871 :
2872 888 : systable_endscan(scan);
2873 :
2874 : /*
2875 : * Now that we have a database OID, we can try to lock the DB.
2876 : */
2877 888 : if (lockmode != NoLock)
2878 888 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2879 :
2880 : /*
2881 : * And now, re-fetch the tuple by OID. If it's still there and still
2882 : * the same name, we win; else, drop the lock and loop back to try
2883 : * again.
2884 : */
2885 888 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2886 888 : if (HeapTupleIsValid(tuple))
2887 : {
2888 888 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2889 :
2890 888 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2891 : {
2892 : Datum datum;
2893 : bool isnull;
2894 :
2895 : /* oid of the database */
2896 888 : if (dbIdP)
2897 888 : *dbIdP = dbOid;
2898 : /* oid of the owner */
2899 888 : if (ownerIdP)
2900 776 : *ownerIdP = dbform->datdba;
2901 : /* character encoding */
2902 888 : if (encodingP)
2903 776 : *encodingP = dbform->encoding;
2904 : /* allowed as template? */
2905 888 : if (dbIsTemplateP)
2906 866 : *dbIsTemplateP = dbform->datistemplate;
2907 : /* Has on login event trigger? */
2908 888 : if (dbHasLoginEvtP)
2909 776 : *dbHasLoginEvtP = dbform->dathasloginevt;
2910 : /* allowing connections? */
2911 888 : if (dbAllowConnP)
2912 776 : *dbAllowConnP = dbform->datallowconn;
2913 : /* limit of frozen XIDs */
2914 888 : if (dbFrozenXidP)
2915 776 : *dbFrozenXidP = dbform->datfrozenxid;
2916 : /* minimum MultiXactId */
2917 888 : if (dbMinMultiP)
2918 776 : *dbMinMultiP = dbform->datminmxid;
2919 : /* default tablespace for this database */
2920 888 : if (dbTablespace)
2921 792 : *dbTablespace = dbform->dattablespace;
2922 : /* default locale settings for this database */
2923 888 : if (dbLocProvider)
2924 776 : *dbLocProvider = dbform->datlocprovider;
2925 888 : if (dbCollate)
2926 : {
2927 776 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2928 776 : *dbCollate = TextDatumGetCString(datum);
2929 : }
2930 888 : if (dbCtype)
2931 : {
2932 776 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2933 776 : *dbCtype = TextDatumGetCString(datum);
2934 : }
2935 888 : if (dbLocale)
2936 : {
2937 776 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2938 776 : if (isnull)
2939 712 : *dbLocale = NULL;
2940 : else
2941 64 : *dbLocale = TextDatumGetCString(datum);
2942 : }
2943 888 : if (dbIcurules)
2944 : {
2945 776 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2946 776 : if (isnull)
2947 776 : *dbIcurules = NULL;
2948 : else
2949 0 : *dbIcurules = TextDatumGetCString(datum);
2950 : }
2951 888 : if (dbCollversion)
2952 : {
2953 776 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2954 776 : if (isnull)
2955 464 : *dbCollversion = NULL;
2956 : else
2957 312 : *dbCollversion = TextDatumGetCString(datum);
2958 : }
2959 888 : ReleaseSysCache(tuple);
2960 888 : result = true;
2961 888 : break;
2962 : }
2963 : /* can only get here if it was just renamed */
2964 0 : ReleaseSysCache(tuple);
2965 : }
2966 :
2967 0 : if (lockmode != NoLock)
2968 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2969 : }
2970 :
2971 920 : table_close(relation, AccessShareLock);
2972 :
2973 920 : return result;
2974 : }
2975 :
2976 : /* Check if current user has createdb privileges */
2977 : bool
2978 848 : have_createdb_privilege(void)
2979 : {
2980 848 : bool result = false;
2981 : HeapTuple utup;
2982 :
2983 : /* Superusers can always do everything */
2984 848 : if (superuser())
2985 812 : return true;
2986 :
2987 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2988 36 : if (HeapTupleIsValid(utup))
2989 : {
2990 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2991 36 : ReleaseSysCache(utup);
2992 : }
2993 36 : return result;
2994 : }
2995 :
2996 : /*
2997 : * Remove tablespace directories
2998 : *
2999 : * We don't know what tablespaces db_id is using, so iterate through all
3000 : * tablespaces removing <tablespace>/db_id
3001 : */
3002 : static void
3003 92 : remove_dbtablespaces(Oid db_id)
3004 : {
3005 : Relation rel;
3006 : TableScanDesc scan;
3007 : HeapTuple tuple;
3008 92 : List *ltblspc = NIL;
3009 : ListCell *cell;
3010 : int ntblspc;
3011 : int i;
3012 : Oid *tablespace_ids;
3013 :
3014 92 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3015 92 : scan = table_beginscan_catalog(rel, 0, NULL);
3016 326 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3017 : {
3018 234 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3019 234 : Oid dsttablespace = spcform->oid;
3020 : char *dstpath;
3021 : struct stat st;
3022 :
3023 : /* Don't mess with the global tablespace */
3024 234 : if (dsttablespace == GLOBALTABLESPACE_OID)
3025 142 : continue;
3026 :
3027 142 : dstpath = GetDatabasePath(db_id, dsttablespace);
3028 :
3029 142 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3030 : {
3031 : /* Assume we can ignore it */
3032 50 : pfree(dstpath);
3033 50 : continue;
3034 : }
3035 :
3036 92 : if (!rmtree(dstpath, true))
3037 0 : ereport(WARNING,
3038 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3039 : dstpath)));
3040 :
3041 92 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3042 92 : pfree(dstpath);
3043 : }
3044 :
3045 92 : ntblspc = list_length(ltblspc);
3046 92 : if (ntblspc == 0)
3047 : {
3048 0 : table_endscan(scan);
3049 0 : table_close(rel, AccessShareLock);
3050 0 : return;
3051 : }
3052 :
3053 92 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3054 92 : i = 0;
3055 184 : foreach(cell, ltblspc)
3056 92 : tablespace_ids[i++] = lfirst_oid(cell);
3057 :
3058 : /* Record the filesystem change in XLOG */
3059 : {
3060 : xl_dbase_drop_rec xlrec;
3061 :
3062 92 : xlrec.db_id = db_id;
3063 92 : xlrec.ntablespaces = ntblspc;
3064 :
3065 92 : XLogBeginInsert();
3066 92 : XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3067 92 : XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3068 :
3069 92 : (void) XLogInsert(RM_DBASE_ID,
3070 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3071 : }
3072 :
3073 92 : list_free(ltblspc);
3074 92 : pfree(tablespace_ids);
3075 :
3076 92 : table_endscan(scan);
3077 92 : table_close(rel, AccessShareLock);
3078 : }
3079 :
3080 : /*
3081 : * Check for existing files that conflict with a proposed new DB OID;
3082 : * return true if there are any
3083 : *
3084 : * If there were a subdirectory in any tablespace matching the proposed new
3085 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3086 : * try to remove that already-existing subdirectory during the cleanup in
3087 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3088 : * instead we make this extra check before settling on the OID of the new
3089 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3090 : * relfilenumber values.
3091 : */
3092 : static bool
3093 750 : check_db_file_conflict(Oid db_id)
3094 : {
3095 750 : bool result = false;
3096 : Relation rel;
3097 : TableScanDesc scan;
3098 : HeapTuple tuple;
3099 :
3100 750 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3101 750 : scan = table_beginscan_catalog(rel, 0, NULL);
3102 2348 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3103 : {
3104 1598 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3105 1598 : Oid dsttablespace = spcform->oid;
3106 : char *dstpath;
3107 : struct stat st;
3108 :
3109 : /* Don't mess with the global tablespace */
3110 1598 : if (dsttablespace == GLOBALTABLESPACE_OID)
3111 750 : continue;
3112 :
3113 848 : dstpath = GetDatabasePath(db_id, dsttablespace);
3114 :
3115 848 : if (lstat(dstpath, &st) == 0)
3116 : {
3117 : /* Found a conflicting file (or directory, whatever) */
3118 0 : pfree(dstpath);
3119 0 : result = true;
3120 0 : break;
3121 : }
3122 :
3123 848 : pfree(dstpath);
3124 : }
3125 :
3126 750 : table_endscan(scan);
3127 750 : table_close(rel, AccessShareLock);
3128 :
3129 750 : return result;
3130 : }
3131 :
3132 : /*
3133 : * Issue a suitable errdetail message for a busy database
3134 : */
3135 : static int
3136 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3137 : {
3138 0 : if (notherbackends > 0 && npreparedxacts > 0)
3139 :
3140 : /*
3141 : * We don't deal with singular versus plural here, since gettext
3142 : * doesn't support multiple plurals in one string.
3143 : */
3144 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3145 : notherbackends, npreparedxacts);
3146 0 : else if (notherbackends > 0)
3147 0 : errdetail_plural("There is %d other session using the database.",
3148 : "There are %d other sessions using the database.",
3149 : notherbackends,
3150 : notherbackends);
3151 : else
3152 0 : errdetail_plural("There is %d prepared transaction using the database.",
3153 : "There are %d prepared transactions using the database.",
3154 : npreparedxacts,
3155 : npreparedxacts);
3156 0 : return 0; /* just to keep ereport macro happy */
3157 : }
3158 :
3159 : /*
3160 : * get_database_oid - given a database name, look up the OID
3161 : *
3162 : * If missing_ok is false, throw an error if database name not found. If
3163 : * true, just return InvalidOid.
3164 : */
3165 : Oid
3166 2868 : get_database_oid(const char *dbname, bool missing_ok)
3167 : {
3168 : Relation pg_database;
3169 : ScanKeyData entry[1];
3170 : SysScanDesc scan;
3171 : HeapTuple dbtuple;
3172 : Oid oid;
3173 :
3174 : /*
3175 : * There's no syscache for pg_database indexed by name, so we must look
3176 : * the hard way.
3177 : */
3178 2868 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3179 2868 : ScanKeyInit(&entry[0],
3180 : Anum_pg_database_datname,
3181 : BTEqualStrategyNumber, F_NAMEEQ,
3182 : CStringGetDatum(dbname));
3183 2868 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3184 : NULL, 1, entry);
3185 :
3186 2868 : dbtuple = systable_getnext(scan);
3187 :
3188 : /* We assume that there can be at most one matching tuple */
3189 2868 : if (HeapTupleIsValid(dbtuple))
3190 2106 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3191 : else
3192 762 : oid = InvalidOid;
3193 :
3194 2868 : systable_endscan(scan);
3195 2868 : table_close(pg_database, AccessShareLock);
3196 :
3197 2868 : if (!OidIsValid(oid) && !missing_ok)
3198 6 : ereport(ERROR,
3199 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3200 : errmsg("database \"%s\" does not exist",
3201 : dbname)));
3202 :
3203 2862 : return oid;
3204 : }
3205 :
3206 :
3207 : /*
3208 : * get_database_name - given a database OID, look up the name
3209 : *
3210 : * Returns a palloc'd string, or NULL if no such database.
3211 : */
3212 : char *
3213 529402 : get_database_name(Oid dbid)
3214 : {
3215 : HeapTuple dbtuple;
3216 : char *result;
3217 :
3218 529402 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3219 529402 : if (HeapTupleIsValid(dbtuple))
3220 : {
3221 529134 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3222 529134 : ReleaseSysCache(dbtuple);
3223 : }
3224 : else
3225 268 : result = NULL;
3226 :
3227 529402 : return result;
3228 : }
3229 :
3230 :
3231 : /*
3232 : * While dropping a database the pg_database row is marked invalid, but the
3233 : * catalog contents still exist. Connections to such a database are not
3234 : * allowed.
3235 : */
3236 : bool
3237 50448 : database_is_invalid_form(Form_pg_database datform)
3238 : {
3239 50448 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3240 : }
3241 :
3242 :
3243 : /*
3244 : * Convenience wrapper around database_is_invalid_form()
3245 : */
3246 : bool
3247 776 : database_is_invalid_oid(Oid dboid)
3248 : {
3249 : HeapTuple dbtup;
3250 : Form_pg_database dbform;
3251 : bool invalid;
3252 :
3253 776 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3254 776 : if (!HeapTupleIsValid(dbtup))
3255 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3256 776 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3257 :
3258 776 : invalid = database_is_invalid_form(dbform);
3259 :
3260 776 : ReleaseSysCache(dbtup);
3261 :
3262 776 : return invalid;
3263 : }
3264 :
3265 :
3266 : /*
3267 : * recovery_create_dbdir()
3268 : *
3269 : * During recovery, there's a case where we validly need to recover a missing
3270 : * tablespace directory so that recovery can continue. This happens when
3271 : * recovery wants to create a database but the holding tablespace has been
3272 : * removed before the server stopped. Since we expect that the directory will
3273 : * be gone before reaching recovery consistency, and we have no knowledge about
3274 : * the tablespace other than its OID here, we create a real directory under
3275 : * pg_tblspc here instead of restoring the symlink.
3276 : *
3277 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3278 : */
3279 : static void
3280 46 : recovery_create_dbdir(char *path, bool only_tblspc)
3281 : {
3282 : struct stat st;
3283 :
3284 : Assert(RecoveryInProgress());
3285 :
3286 46 : if (stat(path, &st) == 0)
3287 46 : return;
3288 :
3289 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3290 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3291 :
3292 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3293 0 : ereport(PANIC,
3294 : errmsg("missing directory \"%s\"", path));
3295 :
3296 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3297 : "creating missing directory: %s", path);
3298 :
3299 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3300 0 : ereport(PANIC,
3301 : errmsg("could not create missing directory \"%s\": %m", path));
3302 : }
3303 :
3304 :
3305 : /*
3306 : * DATABASE resource manager's routines
3307 : */
3308 : void
3309 80 : dbase_redo(XLogReaderState *record)
3310 : {
3311 80 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3312 :
3313 : /* Backup blocks are not used in dbase records */
3314 : Assert(!XLogRecHasAnyBlockRefs(record));
3315 :
3316 80 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3317 : {
3318 8 : xl_dbase_create_file_copy_rec *xlrec =
3319 8 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3320 : char *src_path;
3321 : char *dst_path;
3322 : char *parent_path;
3323 : struct stat st;
3324 :
3325 8 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3326 8 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3327 :
3328 : /*
3329 : * Our theory for replaying a CREATE is to forcibly drop the target
3330 : * subdirectory if present, then re-copy the source data. This may be
3331 : * more work than needed, but it is simple to implement.
3332 : */
3333 8 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3334 : {
3335 0 : if (!rmtree(dst_path, true))
3336 : /* If this failed, copydir() below is going to error. */
3337 0 : ereport(WARNING,
3338 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3339 : dst_path)));
3340 : }
3341 :
3342 : /*
3343 : * If the parent of the target path doesn't exist, create it now. This
3344 : * enables us to create the target underneath later.
3345 : */
3346 8 : parent_path = pstrdup(dst_path);
3347 8 : get_parent_directory(parent_path);
3348 8 : if (stat(parent_path, &st) < 0)
3349 : {
3350 0 : if (errno != ENOENT)
3351 0 : ereport(FATAL,
3352 : errmsg("could not stat directory \"%s\": %m",
3353 : dst_path));
3354 :
3355 : /* create the parent directory if needed and valid */
3356 0 : recovery_create_dbdir(parent_path, true);
3357 : }
3358 8 : pfree(parent_path);
3359 :
3360 : /*
3361 : * There's a case where the copy source directory is missing for the
3362 : * same reason above. Create the empty source directory so that
3363 : * copydir below doesn't fail. The directory will be dropped soon by
3364 : * recovery.
3365 : */
3366 8 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3367 0 : recovery_create_dbdir(src_path, false);
3368 :
3369 : /*
3370 : * Force dirty buffers out to disk, to ensure source database is
3371 : * up-to-date for the copy.
3372 : */
3373 8 : FlushDatabaseBuffers(xlrec->src_db_id);
3374 :
3375 : /* Close all smgr fds in all backends. */
3376 8 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3377 :
3378 : /*
3379 : * Copy this subdirectory to the new location
3380 : *
3381 : * We don't need to copy subdirectories
3382 : */
3383 8 : copydir(src_path, dst_path, false);
3384 :
3385 8 : pfree(src_path);
3386 8 : pfree(dst_path);
3387 : }
3388 72 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3389 : {
3390 46 : xl_dbase_create_wal_log_rec *xlrec =
3391 46 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3392 : char *dbpath;
3393 : char *parent_path;
3394 :
3395 46 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3396 :
3397 : /* create the parent directory if needed and valid */
3398 46 : parent_path = pstrdup(dbpath);
3399 46 : get_parent_directory(parent_path);
3400 46 : recovery_create_dbdir(parent_path, true);
3401 :
3402 : /* Create the database directory with the version file. */
3403 46 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3404 : true);
3405 46 : pfree(dbpath);
3406 : }
3407 26 : else if (info == XLOG_DBASE_DROP)
3408 : {
3409 26 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3410 : char *dst_path;
3411 : int i;
3412 :
3413 26 : if (InHotStandby)
3414 : {
3415 : /*
3416 : * Lock database while we resolve conflicts to ensure that
3417 : * InitPostgres() cannot fully re-execute concurrently. This
3418 : * avoids backends re-connecting automatically to same database,
3419 : * which can happen in some cases.
3420 : *
3421 : * This will lock out walsenders trying to connect to db-specific
3422 : * slots for logical decoding too, so it's safe for us to drop
3423 : * slots.
3424 : */
3425 26 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3426 26 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3427 : }
3428 :
3429 : /* Drop any database-specific replication slots */
3430 26 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3431 :
3432 : /* Drop pages for this database that are in the shared buffer cache */
3433 26 : DropDatabaseBuffers(xlrec->db_id);
3434 :
3435 : /* Also, clean out any fsync requests that might be pending in md.c */
3436 26 : ForgetDatabaseSyncRequests(xlrec->db_id);
3437 :
3438 : /* Clean out the xlog relcache too */
3439 26 : XLogDropDatabase(xlrec->db_id);
3440 :
3441 : /* Close all smgr fds in all backends. */
3442 26 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3443 :
3444 52 : for (i = 0; i < xlrec->ntablespaces; i++)
3445 : {
3446 26 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3447 :
3448 : /* And remove the physical files */
3449 26 : if (!rmtree(dst_path, true))
3450 0 : ereport(WARNING,
3451 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3452 : dst_path)));
3453 26 : pfree(dst_path);
3454 : }
3455 :
3456 26 : if (InHotStandby)
3457 : {
3458 : /*
3459 : * Release locks prior to commit. XXX There is a race condition
3460 : * here that may allow backends to reconnect, but the window for
3461 : * this is small because the gap between here and commit is mostly
3462 : * fairly small and it is unlikely that people will be dropping
3463 : * databases that we are trying to connect to anyway.
3464 : */
3465 26 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3466 : }
3467 : }
3468 : else
3469 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3470 80 : }
|