Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/procsignal.h"
64 : #include "storage/smgr.h"
65 : #include "utils/acl.h"
66 : #include "utils/builtins.h"
67 : #include "utils/fmgroids.h"
68 : #include "utils/lsyscache.h"
69 : #include "utils/pg_locale.h"
70 : #include "utils/relmapper.h"
71 : #include "utils/snapmgr.h"
72 : #include "utils/syscache.h"
73 : #include "utils/wait_event.h"
74 :
75 : /*
76 : * Create database strategy.
77 : *
78 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
79 : * copied block.
80 : *
81 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
82 : * database and log a single record for each tablespace copied. To make this
83 : * safe, it also triggers checkpoints before and after the operation.
84 : */
85 : typedef enum CreateDBStrategy
86 : {
87 : CREATEDB_WAL_LOG,
88 : CREATEDB_FILE_COPY,
89 : } CreateDBStrategy;
90 :
91 : typedef struct
92 : {
93 : Oid src_dboid; /* source (template) DB */
94 : Oid dest_dboid; /* DB we are trying to create */
95 : CreateDBStrategy strategy; /* create db strategy */
96 : } createdb_failure_params;
97 :
98 : typedef struct
99 : {
100 : Oid dest_dboid; /* DB we are trying to move */
101 : Oid dest_tsoid; /* tablespace we are trying to move to */
102 : } movedb_failure_params;
103 :
104 : /*
105 : * Information about a relation to be copied when creating a database.
106 : */
107 : typedef struct CreateDBRelInfo
108 : {
109 : RelFileLocator rlocator; /* physical relation identifier */
110 : Oid reloid; /* relation oid */
111 : bool permanent; /* relation is permanent or unlogged */
112 : } CreateDBRelInfo;
113 :
114 :
115 : /* non-export function prototypes */
116 : static void createdb_failure_callback(int code, Datum arg);
117 : static void movedb(const char *dbname, const char *tblspcname);
118 : static void movedb_failure_callback(int code, Datum arg);
119 : static bool get_db_info(const char *name, LOCKMODE lockmode,
120 : Oid *dbIdP, Oid *ownerIdP,
121 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
122 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
123 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
124 : char **dbIcurules,
125 : char *dbLocProvider,
126 : char **dbCollversion);
127 : static void remove_dbtablespaces(Oid db_id);
128 : static bool check_db_file_conflict(Oid db_id);
129 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
130 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
131 : Oid dst_tsid);
132 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
133 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
134 : Oid dbid, char *srcpath,
135 : List *rlocatorlist, Snapshot snapshot);
136 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
137 : Oid tbid, Oid dbid,
138 : char *srcpath);
139 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
140 : bool isRedo);
141 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
142 : Oid src_tsid, Oid dst_tsid);
143 : static void recovery_create_dbdir(char *path, bool only_tblspc);
144 :
145 : /*
146 : * Create a new database using the WAL_LOG strategy.
147 : *
148 : * Each copied block is separately written to the write-ahead log.
149 : */
150 : static void
151 280 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
152 : Oid src_tsid, Oid dst_tsid)
153 : {
154 : char *srcpath;
155 : char *dstpath;
156 280 : List *rlocatorlist = NULL;
157 : ListCell *cell;
158 : LockRelId srcrelid;
159 : LockRelId dstrelid;
160 : RelFileLocator srcrlocator;
161 : RelFileLocator dstrlocator;
162 : CreateDBRelInfo *relinfo;
163 :
164 : /* Get source and destination database paths. */
165 280 : srcpath = GetDatabasePath(src_dboid, src_tsid);
166 280 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
167 :
168 : /* Create database directory and write PG_VERSION file. */
169 280 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
170 :
171 : /* Copy relmap file from source database to the destination database. */
172 280 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
173 :
174 : /* Get list of relfilelocators to copy from the source database. */
175 280 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
176 : Assert(rlocatorlist != NIL);
177 :
178 : /*
179 : * Database IDs will be the same for all relations so set them before
180 : * entering the loop.
181 : */
182 280 : srcrelid.dbId = src_dboid;
183 280 : dstrelid.dbId = dst_dboid;
184 :
185 : /* Loop over our list of relfilelocators and copy each one. */
186 68138 : foreach(cell, rlocatorlist)
187 : {
188 67860 : relinfo = lfirst(cell);
189 67860 : srcrlocator = relinfo->rlocator;
190 :
191 : /*
192 : * If the relation is from the source db's default tablespace then we
193 : * need to create it in the destination db's default tablespace.
194 : * Otherwise, we need to create in the same tablespace as it is in the
195 : * source database.
196 : */
197 67860 : if (srcrlocator.spcOid == src_tsid)
198 67860 : dstrlocator.spcOid = dst_tsid;
199 : else
200 0 : dstrlocator.spcOid = srcrlocator.spcOid;
201 :
202 67860 : dstrlocator.dbOid = dst_dboid;
203 67860 : dstrlocator.relNumber = srcrlocator.relNumber;
204 :
205 : /*
206 : * Acquire locks on source and target relations before copying.
207 : *
208 : * We typically do not read relation data into shared_buffers without
209 : * holding a relation lock. It's unclear what could go wrong if we
210 : * skipped it in this case, because nobody can be modifying either the
211 : * source or destination database at this point, and we have locks on
212 : * both databases, too, but let's take the conservative route.
213 : */
214 67860 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
215 67860 : LockRelationId(&srcrelid, AccessShareLock);
216 67860 : LockRelationId(&dstrelid, AccessShareLock);
217 :
218 : /* Copy relation storage from source to the destination. */
219 67860 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
220 :
221 : /* Release the relation locks. */
222 67858 : UnlockRelationId(&srcrelid, AccessShareLock);
223 67858 : UnlockRelationId(&dstrelid, AccessShareLock);
224 : }
225 :
226 278 : pfree(srcpath);
227 278 : pfree(dstpath);
228 278 : list_free_deep(rlocatorlist);
229 278 : }
230 :
231 : /*
232 : * Scan the pg_class table in the source database to identify the relations
233 : * that need to be copied to the destination database.
234 : *
235 : * This is an exception to the usual rule that cross-database access is
236 : * not possible. We can make it work here because we know that there are no
237 : * connections to the source database and (since there can't be prepared
238 : * transactions touching that database) no in-doubt tuples either. This
239 : * means that we don't need to worry about pruning removing anything from
240 : * under us, and we don't need to be too picky about our snapshot either.
241 : * As long as it sees all previously-committed XIDs as committed and all
242 : * aborted XIDs as aborted, we should be fine: nothing else is possible
243 : * here.
244 : *
245 : * We can't rely on the relcache for anything here, because that only knows
246 : * about the database to which we are connected, and can't handle access to
247 : * other databases. That also means we can't rely on the heap scan
248 : * infrastructure, which would be a bad idea anyway since it might try
249 : * to do things like HOT pruning which we definitely can't do safely in
250 : * a database to which we're not even connected.
251 : */
252 : static List *
253 280 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
254 : {
255 : RelFileLocator rlocator;
256 : BlockNumber nblocks;
257 : BlockNumber blkno;
258 : Buffer buf;
259 : RelFileNumber relfilenumber;
260 : Page page;
261 280 : List *rlocatorlist = NIL;
262 : LockRelId relid;
263 : Snapshot snapshot;
264 : SMgrRelation smgr;
265 : BufferAccessStrategy bstrategy;
266 :
267 : /* Get pg_class relfilenumber. */
268 280 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
269 : RelationRelationId);
270 :
271 : /* Don't read data into shared_buffers without holding a relation lock. */
272 280 : relid.dbId = dbid;
273 280 : relid.relId = RelationRelationId;
274 280 : LockRelationId(&relid, AccessShareLock);
275 :
276 : /* Prepare a RelFileLocator for the pg_class relation. */
277 280 : rlocator.spcOid = tbid;
278 280 : rlocator.dbOid = dbid;
279 280 : rlocator.relNumber = relfilenumber;
280 :
281 280 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
282 280 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
283 280 : smgrclose(smgr);
284 :
285 : /* Use a buffer access strategy since this is a bulk read operation. */
286 280 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
287 :
288 : /*
289 : * As explained in the function header comments, we need a snapshot that
290 : * will see all committed transactions as committed, and our transaction
291 : * snapshot - or the active snapshot - might not be new enough for that,
292 : * but the return value of GetLatestSnapshot() should work fine.
293 : */
294 280 : snapshot = RegisterSnapshot(GetLatestSnapshot());
295 :
296 : /* Process the relation block by block. */
297 4480 : for (blkno = 0; blkno < nblocks; blkno++)
298 : {
299 4200 : CHECK_FOR_INTERRUPTS();
300 :
301 4200 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
302 : RBM_NORMAL, bstrategy, true);
303 :
304 4200 : LockBuffer(buf, BUFFER_LOCK_SHARE);
305 4200 : page = BufferGetPage(buf);
306 4200 : if (PageIsNew(page) || PageIsEmpty(page))
307 : {
308 0 : UnlockReleaseBuffer(buf);
309 0 : continue;
310 : }
311 :
312 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
313 4200 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
314 : srcpath, rlocatorlist,
315 : snapshot);
316 :
317 4200 : UnlockReleaseBuffer(buf);
318 : }
319 280 : UnregisterSnapshot(snapshot);
320 :
321 : /* Release relation lock. */
322 280 : UnlockRelationId(&relid, AccessShareLock);
323 :
324 280 : return rlocatorlist;
325 : }
326 :
327 : /*
328 : * Scan one page of the source database's pg_class relation and add relevant
329 : * entries to rlocatorlist. The return value is the updated list.
330 : */
331 : static List *
332 4200 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
333 : char *srcpath, List *rlocatorlist,
334 : Snapshot snapshot)
335 : {
336 4200 : BlockNumber blkno = BufferGetBlockNumber(buf);
337 : OffsetNumber offnum;
338 : OffsetNumber maxoff;
339 : HeapTupleData tuple;
340 :
341 4200 : maxoff = PageGetMaxOffsetNumber(page);
342 :
343 : /* Loop over offsets. */
344 4200 : for (offnum = FirstOffsetNumber;
345 199466 : offnum <= maxoff;
346 195266 : offnum = OffsetNumberNext(offnum))
347 : {
348 : ItemId itemid;
349 :
350 195266 : itemid = PageGetItemId(page, offnum);
351 :
352 : /* Nothing to do if slot is empty or already dead. */
353 195266 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
354 156571 : ItemIdIsRedirected(itemid))
355 68655 : continue;
356 :
357 : Assert(ItemIdIsNormal(itemid));
358 126611 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
359 :
360 : /* Initialize a HeapTupleData structure. */
361 126611 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
362 126611 : tuple.t_len = ItemIdGetLength(itemid);
363 126611 : tuple.t_tableOid = RelationRelationId;
364 :
365 : /* Skip tuples that are not visible to this snapshot. */
366 126611 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
367 : {
368 : CreateDBRelInfo *relinfo;
369 :
370 : /*
371 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
372 : * CreateDBRelInfo object for this tuple, but can also decide that
373 : * this tuple isn't something we need to copy. If we do need to
374 : * copy the relation, add it to the list.
375 : */
376 126588 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
377 : srcpath);
378 126588 : if (relinfo != NULL)
379 68348 : rlocatorlist = lappend(rlocatorlist, relinfo);
380 : }
381 : }
382 :
383 4200 : return rlocatorlist;
384 : }
385 :
386 : /*
387 : * Decide whether a certain pg_class tuple represents something that
388 : * needs to be copied from the source database to the destination database,
389 : * and if so, construct a CreateDBRelInfo for it.
390 : *
391 : * Visibility checks are handled by the caller, so our job here is just
392 : * to assess the data stored in the tuple.
393 : */
394 : CreateDBRelInfo *
395 126588 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
396 : char *srcpath)
397 : {
398 : CreateDBRelInfo *relinfo;
399 : Form_pg_class classForm;
400 126588 : RelFileNumber relfilenumber = InvalidRelFileNumber;
401 :
402 126588 : classForm = (Form_pg_class) GETSTRUCT(tuple);
403 :
404 : /*
405 : * Return NULL if this object does not need to be copied.
406 : *
407 : * Shared objects don't need to be copied, because they are shared.
408 : * Objects without storage can't be copied, because there's nothing to
409 : * copy. Temporary relations don't need to be copied either, because they
410 : * are inaccessible outside of the session that created them, which must
411 : * be gone already, and couldn't connect to a different database if it
412 : * still existed. autovacuum will eventually remove the pg_class entries
413 : * as well.
414 : */
415 126588 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
416 113708 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
417 68348 : classForm->relpersistence == RELPERSISTENCE_TEMP)
418 58240 : return NULL;
419 :
420 : /*
421 : * If relfilenumber is valid then directly use it. Otherwise, consult the
422 : * relmap.
423 : */
424 68348 : if (RelFileNumberIsValid(classForm->relfilenode))
425 63588 : relfilenumber = classForm->relfilenode;
426 : else
427 4760 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
428 : classForm->oid);
429 :
430 : /* We must have a valid relfilenumber. */
431 68348 : if (!RelFileNumberIsValid(relfilenumber))
432 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
433 : classForm->oid);
434 :
435 : /* Prepare a rel info element and add it to the list. */
436 68348 : relinfo = palloc_object(CreateDBRelInfo);
437 68348 : if (OidIsValid(classForm->reltablespace))
438 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
439 : else
440 68348 : relinfo->rlocator.spcOid = tbid;
441 :
442 68348 : relinfo->rlocator.dbOid = dbid;
443 68348 : relinfo->rlocator.relNumber = relfilenumber;
444 68348 : relinfo->reloid = classForm->oid;
445 :
446 : /* Temporary relations were rejected above. */
447 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
448 68348 : relinfo->permanent =
449 68348 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
450 :
451 68348 : return relinfo;
452 : }
453 :
454 : /*
455 : * Create database directory and write out the PG_VERSION file in the database
456 : * path. If isRedo is true, it's okay for the database directory to exist
457 : * already.
458 : */
459 : static void
460 305 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
461 : {
462 : int fd;
463 : int nbytes;
464 : char versionfile[MAXPGPATH];
465 : char buf[16];
466 :
467 : /*
468 : * Note that we don't have to copy version data from the source database;
469 : * there's only one legal value.
470 : */
471 305 : sprintf(buf, "%s\n", PG_MAJORVERSION);
472 305 : nbytes = strlen(PG_MAJORVERSION) + 1;
473 :
474 : /* Create database directory. */
475 305 : if (MakePGDirectory(dbpath) < 0)
476 : {
477 : /* Failure other than already exists or not in WAL replay? */
478 8 : if (errno != EEXIST || !isRedo)
479 0 : ereport(ERROR,
480 : (errcode_for_file_access(),
481 : errmsg("could not create directory \"%s\": %m", dbpath)));
482 : }
483 :
484 : /*
485 : * Create PG_VERSION file in the database path. If the file already
486 : * exists and we are in WAL replay then try again to open it in write
487 : * mode.
488 : */
489 305 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
490 :
491 305 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
492 305 : if (fd < 0 && errno == EEXIST && isRedo)
493 8 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
494 :
495 305 : if (fd < 0)
496 0 : ereport(ERROR,
497 : (errcode_for_file_access(),
498 : errmsg("could not create file \"%s\": %m", versionfile)));
499 :
500 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
501 305 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
502 305 : errno = 0;
503 305 : if ((int) write(fd, buf, nbytes) != nbytes)
504 : {
505 : /* If write didn't set errno, assume problem is no disk space. */
506 0 : if (errno == 0)
507 0 : errno = ENOSPC;
508 0 : ereport(ERROR,
509 : (errcode_for_file_access(),
510 : errmsg("could not write to file \"%s\": %m", versionfile)));
511 : }
512 305 : pgstat_report_wait_end();
513 :
514 305 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
515 305 : if (pg_fsync(fd) != 0)
516 0 : ereport(data_sync_elevel(ERROR),
517 : (errcode_for_file_access(),
518 : errmsg("could not fsync file \"%s\": %m", versionfile)));
519 305 : fsync_fname(dbpath, true);
520 305 : pgstat_report_wait_end();
521 :
522 : /* Close the version file. */
523 305 : CloseTransientFile(fd);
524 :
525 : /* If we are not in WAL replay then write the WAL. */
526 305 : if (!isRedo)
527 : {
528 : xl_dbase_create_wal_log_rec xlrec;
529 :
530 280 : START_CRIT_SECTION();
531 :
532 280 : xlrec.db_id = dbid;
533 280 : xlrec.tablespace_id = tsid;
534 :
535 280 : XLogBeginInsert();
536 280 : XLogRegisterData(&xlrec,
537 : sizeof(xl_dbase_create_wal_log_rec));
538 :
539 280 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
540 :
541 280 : END_CRIT_SECTION();
542 : }
543 305 : }
544 :
545 : /*
546 : * Create a new database using the FILE_COPY strategy.
547 : *
548 : * Copy each tablespace at the filesystem level, and log a single WAL record
549 : * for each tablespace copied. This requires a checkpoint before and after the
550 : * copy, which may be expensive, but it does greatly reduce WAL generation
551 : * if the copied database is large.
552 : */
553 : static void
554 152 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
555 : Oid dst_tsid)
556 : {
557 : TableScanDesc scan;
558 : Relation rel;
559 : HeapTuple tuple;
560 :
561 : /*
562 : * Force a checkpoint before starting the copy. This will force all dirty
563 : * buffers, including those of unlogged tables, out to disk, to ensure
564 : * source database is up-to-date on disk for the copy.
565 : * FlushDatabaseBuffers() would suffice for that, but we also want to
566 : * process any pending unlink requests. Otherwise, if a checkpoint
567 : * happened while we're copying files, a file might be deleted just when
568 : * we're about to copy it, causing the lstat() call in copydir() to fail
569 : * with ENOENT.
570 : *
571 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
572 : * is careful to ensure that template0 is fully written to disk prior to
573 : * any CREATE DATABASE commands.
574 : */
575 152 : if (!IsBinaryUpgrade)
576 120 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
577 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
578 :
579 : /*
580 : * Iterate through all tablespaces of the template database, and copy each
581 : * one to the new database.
582 : */
583 152 : rel = table_open(TableSpaceRelationId, AccessShareLock);
584 152 : scan = table_beginscan_catalog(rel, 0, NULL);
585 490 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
586 : {
587 338 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
588 338 : Oid srctablespace = spaceform->oid;
589 : Oid dsttablespace;
590 : char *srcpath;
591 : char *dstpath;
592 : struct stat st;
593 :
594 : /* No need to copy global tablespace */
595 338 : if (srctablespace == GLOBALTABLESPACE_OID)
596 186 : continue;
597 :
598 186 : srcpath = GetDatabasePath(src_dboid, srctablespace);
599 :
600 338 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
601 152 : directory_is_empty(srcpath))
602 : {
603 : /* Assume we can ignore it */
604 34 : pfree(srcpath);
605 34 : continue;
606 : }
607 :
608 152 : if (srctablespace == src_tsid)
609 152 : dsttablespace = dst_tsid;
610 : else
611 0 : dsttablespace = srctablespace;
612 :
613 152 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
614 :
615 : /*
616 : * Copy this subdirectory to the new location
617 : *
618 : * We don't need to copy subdirectories
619 : */
620 152 : copydir(srcpath, dstpath, false);
621 :
622 : /* Record the filesystem change in XLOG */
623 : {
624 : xl_dbase_create_file_copy_rec xlrec;
625 :
626 152 : xlrec.db_id = dst_dboid;
627 152 : xlrec.tablespace_id = dsttablespace;
628 152 : xlrec.src_db_id = src_dboid;
629 152 : xlrec.src_tablespace_id = srctablespace;
630 :
631 152 : XLogBeginInsert();
632 152 : XLogRegisterData(&xlrec,
633 : sizeof(xl_dbase_create_file_copy_rec));
634 :
635 152 : (void) XLogInsert(RM_DBASE_ID,
636 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
637 : }
638 152 : pfree(srcpath);
639 152 : pfree(dstpath);
640 : }
641 152 : table_endscan(scan);
642 152 : table_close(rel, AccessShareLock);
643 :
644 : /*
645 : * We force a checkpoint before committing. This effectively means that
646 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
647 : * replayed (at least not in ordinary crash recovery; we still have to
648 : * make the XLOG entry for the benefit of PITR operations). This avoids
649 : * two nasty scenarios:
650 : *
651 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
652 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
653 : * of DBASE_CREATE replay would lose such files created in the new
654 : * database between our commit and the next checkpoint.
655 : *
656 : * #2: Since we have to recopy the source database during DBASE_CREATE
657 : * replay, we run the risk of copying changes in it that were committed
658 : * after the original CREATE DATABASE command but before the system crash
659 : * that led to the replay. This is at least unexpected and at worst could
660 : * lead to inconsistencies, eg duplicate table names.
661 : *
662 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
663 : *
664 : * In PITR replay, the first of these isn't an issue, and the second is
665 : * only a risk if the CREATE DATABASE and subsequent template database
666 : * change both occur while a base backup is being taken. There doesn't
667 : * seem to be much we can do about that except document it as a
668 : * limitation.
669 : *
670 : * In binary upgrade mode, we can skip this checkpoint because neither of
671 : * these problems applies: we don't ever replay the WAL generated during
672 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
673 : * (not to mention that we don't concurrently modify template0, either).
674 : *
675 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
676 : * strategy that avoids these problems.
677 : */
678 152 : if (!IsBinaryUpgrade)
679 120 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
680 : CHECKPOINT_WAIT);
681 152 : }
682 :
683 : /*
684 : * CREATE DATABASE
685 : */
686 : Oid
687 454 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
688 : {
689 : Oid src_dboid;
690 : Oid src_owner;
691 454 : int src_encoding = -1;
692 454 : char *src_collate = NULL;
693 454 : char *src_ctype = NULL;
694 454 : char *src_locale = NULL;
695 454 : char *src_icurules = NULL;
696 454 : char src_locprovider = '\0';
697 454 : char *src_collversion = NULL;
698 : bool src_istemplate;
699 454 : bool src_hasloginevt = false;
700 : bool src_allowconn;
701 454 : TransactionId src_frozenxid = InvalidTransactionId;
702 454 : MultiXactId src_minmxid = InvalidMultiXactId;
703 : Oid src_deftablespace;
704 : volatile Oid dst_deftablespace;
705 : Relation pg_database_rel;
706 : HeapTuple tuple;
707 454 : Datum new_record[Natts_pg_database] = {0};
708 454 : bool new_record_nulls[Natts_pg_database] = {0};
709 454 : Oid dboid = InvalidOid;
710 : Oid datdba;
711 : ListCell *option;
712 454 : DefElem *tablespacenameEl = NULL;
713 454 : DefElem *ownerEl = NULL;
714 454 : DefElem *templateEl = NULL;
715 454 : DefElem *encodingEl = NULL;
716 454 : DefElem *localeEl = NULL;
717 454 : DefElem *builtinlocaleEl = NULL;
718 454 : DefElem *collateEl = NULL;
719 454 : DefElem *ctypeEl = NULL;
720 454 : DefElem *iculocaleEl = NULL;
721 454 : DefElem *icurulesEl = NULL;
722 454 : DefElem *locproviderEl = NULL;
723 454 : DefElem *istemplateEl = NULL;
724 454 : DefElem *allowconnectionsEl = NULL;
725 454 : DefElem *connlimitEl = NULL;
726 454 : DefElem *collversionEl = NULL;
727 454 : DefElem *strategyEl = NULL;
728 454 : char *dbname = stmt->dbname;
729 454 : char *dbowner = NULL;
730 454 : const char *dbtemplate = NULL;
731 454 : char *dbcollate = NULL;
732 454 : char *dbctype = NULL;
733 454 : const char *dblocale = NULL;
734 454 : char *dbicurules = NULL;
735 454 : char dblocprovider = '\0';
736 : char *canonname;
737 454 : int encoding = -1;
738 454 : bool dbistemplate = false;
739 454 : bool dballowconnections = true;
740 454 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
741 454 : char *dbcollversion = NULL;
742 : int notherbackends;
743 : int npreparedxacts;
744 454 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
745 : createdb_failure_params fparms;
746 :
747 : /* Report error if name has \n or \r character. */
748 454 : if (strpbrk(dbname, "\n\r"))
749 3 : ereport(ERROR,
750 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
751 : errmsg("database name \"%s\" contains a newline or carriage return character", dbname)));
752 :
753 : /* Extract options from the statement node tree */
754 1349 : foreach(option, stmt->options)
755 : {
756 898 : DefElem *defel = (DefElem *) lfirst(option);
757 :
758 898 : if (strcmp(defel->defname, "tablespace") == 0)
759 : {
760 17 : if (tablespacenameEl)
761 0 : errorConflictingDefElem(defel, pstate);
762 17 : tablespacenameEl = defel;
763 : }
764 881 : else if (strcmp(defel->defname, "owner") == 0)
765 : {
766 5 : if (ownerEl)
767 0 : errorConflictingDefElem(defel, pstate);
768 5 : ownerEl = defel;
769 : }
770 876 : else if (strcmp(defel->defname, "template") == 0)
771 : {
772 198 : if (templateEl)
773 0 : errorConflictingDefElem(defel, pstate);
774 198 : templateEl = defel;
775 : }
776 678 : else if (strcmp(defel->defname, "encoding") == 0)
777 : {
778 61 : if (encodingEl)
779 0 : errorConflictingDefElem(defel, pstate);
780 61 : encodingEl = defel;
781 : }
782 617 : else if (strcmp(defel->defname, "locale") == 0)
783 : {
784 57 : if (localeEl)
785 0 : errorConflictingDefElem(defel, pstate);
786 57 : localeEl = defel;
787 : }
788 560 : else if (strcmp(defel->defname, "builtin_locale") == 0)
789 : {
790 8 : if (builtinlocaleEl)
791 0 : errorConflictingDefElem(defel, pstate);
792 8 : builtinlocaleEl = defel;
793 : }
794 552 : else if (strcmp(defel->defname, "lc_collate") == 0)
795 : {
796 14 : if (collateEl)
797 0 : errorConflictingDefElem(defel, pstate);
798 14 : collateEl = defel;
799 : }
800 538 : else if (strcmp(defel->defname, "lc_ctype") == 0)
801 : {
802 14 : if (ctypeEl)
803 0 : errorConflictingDefElem(defel, pstate);
804 14 : ctypeEl = defel;
805 : }
806 524 : else if (strcmp(defel->defname, "icu_locale") == 0)
807 : {
808 5 : if (iculocaleEl)
809 0 : errorConflictingDefElem(defel, pstate);
810 5 : iculocaleEl = defel;
811 : }
812 519 : else if (strcmp(defel->defname, "icu_rules") == 0)
813 : {
814 1 : if (icurulesEl)
815 0 : errorConflictingDefElem(defel, pstate);
816 1 : icurulesEl = defel;
817 : }
818 518 : else if (strcmp(defel->defname, "locale_provider") == 0)
819 : {
820 62 : if (locproviderEl)
821 0 : errorConflictingDefElem(defel, pstate);
822 62 : locproviderEl = defel;
823 : }
824 456 : else if (strcmp(defel->defname, "is_template") == 0)
825 : {
826 57 : if (istemplateEl)
827 0 : errorConflictingDefElem(defel, pstate);
828 57 : istemplateEl = defel;
829 : }
830 399 : else if (strcmp(defel->defname, "allow_connections") == 0)
831 : {
832 56 : if (allowconnectionsEl)
833 0 : errorConflictingDefElem(defel, pstate);
834 56 : allowconnectionsEl = defel;
835 : }
836 343 : else if (strcmp(defel->defname, "connection_limit") == 0)
837 : {
838 0 : if (connlimitEl)
839 0 : errorConflictingDefElem(defel, pstate);
840 0 : connlimitEl = defel;
841 : }
842 343 : else if (strcmp(defel->defname, "collation_version") == 0)
843 : {
844 32 : if (collversionEl)
845 0 : errorConflictingDefElem(defel, pstate);
846 32 : collversionEl = defel;
847 : }
848 311 : else if (strcmp(defel->defname, "location") == 0)
849 : {
850 0 : ereport(WARNING,
851 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
852 : errmsg("LOCATION is not supported anymore"),
853 : errhint("Consider using tablespaces instead."),
854 : parser_errposition(pstate, defel->location)));
855 : }
856 311 : else if (strcmp(defel->defname, "oid") == 0)
857 : {
858 147 : dboid = defGetObjectId(defel);
859 :
860 : /*
861 : * We don't normally permit new databases to be created with
862 : * system-assigned OIDs. pg_upgrade tries to preserve database
863 : * OIDs, so we can't allow any database to be created with an OID
864 : * that might be in use in a freshly-initialized cluster created
865 : * by some future version. We assume all such OIDs will be from
866 : * the system-managed OID range.
867 : *
868 : * As an exception, however, we permit any OID to be assigned when
869 : * allow_system_table_mods=on (so that initdb can assign system
870 : * OIDs to template0 and postgres) or when performing a binary
871 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
872 : * in the source cluster).
873 : */
874 147 : if (dboid < FirstNormalObjectId &&
875 130 : !allowSystemTableMods && !IsBinaryUpgrade)
876 0 : ereport(ERROR,
877 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
878 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
879 : }
880 164 : else if (strcmp(defel->defname, "strategy") == 0)
881 : {
882 164 : if (strategyEl)
883 0 : errorConflictingDefElem(defel, pstate);
884 164 : strategyEl = defel;
885 : }
886 : else
887 0 : ereport(ERROR,
888 : (errcode(ERRCODE_SYNTAX_ERROR),
889 : errmsg("option \"%s\" not recognized", defel->defname),
890 : parser_errposition(pstate, defel->location)));
891 : }
892 :
893 451 : if (ownerEl && ownerEl->arg)
894 5 : dbowner = defGetString(ownerEl);
895 451 : if (templateEl && templateEl->arg)
896 198 : dbtemplate = defGetString(templateEl);
897 451 : if (encodingEl && encodingEl->arg)
898 : {
899 : const char *encoding_name;
900 :
901 61 : if (IsA(encodingEl->arg, Integer))
902 : {
903 0 : encoding = defGetInt32(encodingEl);
904 0 : encoding_name = pg_encoding_to_char(encoding);
905 0 : if (strcmp(encoding_name, "") == 0 ||
906 0 : pg_valid_server_encoding(encoding_name) < 0)
907 0 : ereport(ERROR,
908 : (errcode(ERRCODE_UNDEFINED_OBJECT),
909 : errmsg("%d is not a valid encoding code",
910 : encoding),
911 : parser_errposition(pstate, encodingEl->location)));
912 : }
913 : else
914 : {
915 61 : encoding_name = defGetString(encodingEl);
916 61 : encoding = pg_valid_server_encoding(encoding_name);
917 61 : if (encoding < 0)
918 0 : ereport(ERROR,
919 : (errcode(ERRCODE_UNDEFINED_OBJECT),
920 : errmsg("%s is not a valid encoding name",
921 : encoding_name),
922 : parser_errposition(pstate, encodingEl->location)));
923 : }
924 : }
925 451 : if (localeEl && localeEl->arg)
926 : {
927 57 : dbcollate = defGetString(localeEl);
928 57 : dbctype = defGetString(localeEl);
929 57 : dblocale = defGetString(localeEl);
930 : }
931 451 : if (builtinlocaleEl && builtinlocaleEl->arg)
932 8 : dblocale = defGetString(builtinlocaleEl);
933 451 : if (collateEl && collateEl->arg)
934 14 : dbcollate = defGetString(collateEl);
935 451 : if (ctypeEl && ctypeEl->arg)
936 14 : dbctype = defGetString(ctypeEl);
937 451 : if (iculocaleEl && iculocaleEl->arg)
938 5 : dblocale = defGetString(iculocaleEl);
939 451 : if (icurulesEl && icurulesEl->arg)
940 1 : dbicurules = defGetString(icurulesEl);
941 451 : if (locproviderEl && locproviderEl->arg)
942 : {
943 62 : char *locproviderstr = defGetString(locproviderEl);
944 :
945 62 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
946 17 : dblocprovider = COLLPROVIDER_BUILTIN;
947 45 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
948 8 : dblocprovider = COLLPROVIDER_ICU;
949 37 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
950 36 : dblocprovider = COLLPROVIDER_LIBC;
951 : else
952 1 : ereport(ERROR,
953 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
954 : errmsg("unrecognized locale provider: %s",
955 : locproviderstr)));
956 : }
957 450 : if (istemplateEl && istemplateEl->arg)
958 57 : dbistemplate = defGetBoolean(istemplateEl);
959 450 : if (allowconnectionsEl && allowconnectionsEl->arg)
960 56 : dballowconnections = defGetBoolean(allowconnectionsEl);
961 450 : if (connlimitEl && connlimitEl->arg)
962 : {
963 0 : dbconnlimit = defGetInt32(connlimitEl);
964 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
965 0 : ereport(ERROR,
966 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
967 : errmsg("invalid connection limit: %d", dbconnlimit)));
968 : }
969 450 : if (collversionEl)
970 32 : dbcollversion = defGetString(collversionEl);
971 :
972 : /* obtain OID of proposed owner */
973 450 : if (dbowner)
974 5 : datdba = get_role_oid(dbowner, false);
975 : else
976 445 : datdba = GetUserId();
977 :
978 : /*
979 : * To create a database, must have createdb privilege and must be able to
980 : * become the target role (this does not imply that the target role itself
981 : * must have createdb privilege). The latter provision guards against
982 : * "giveaway" attacks. Note that a superuser will always have both of
983 : * these privileges a fortiori.
984 : */
985 450 : if (!have_createdb_privilege())
986 4 : ereport(ERROR,
987 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
988 : errmsg("permission denied to create database")));
989 :
990 446 : check_can_set_role(GetUserId(), datdba);
991 :
992 : /*
993 : * Lookup database (template) to be cloned, and obtain share lock on it.
994 : * ShareLock allows two CREATE DATABASEs to work from the same template
995 : * concurrently, while ensuring no one is busy dropping it in parallel
996 : * (which would be Very Bad since we'd likely get an incomplete copy
997 : * without knowing it). This also prevents any new connections from being
998 : * made to the source until we finish copying it, so we can be sure it
999 : * won't change underneath us.
1000 : */
1001 446 : if (!dbtemplate)
1002 249 : dbtemplate = "template1"; /* Default template database name */
1003 :
1004 446 : if (!get_db_info(dbtemplate, ShareLock,
1005 : &src_dboid, &src_owner, &src_encoding,
1006 : &src_istemplate, &src_allowconn, &src_hasloginevt,
1007 : &src_frozenxid, &src_minmxid, &src_deftablespace,
1008 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1009 : &src_collversion))
1010 0 : ereport(ERROR,
1011 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1012 : errmsg("template database \"%s\" does not exist",
1013 : dbtemplate)));
1014 :
1015 : /*
1016 : * If the source database was in the process of being dropped, we can't
1017 : * use it as a template.
1018 : */
1019 446 : if (database_is_invalid_oid(src_dboid))
1020 1 : ereport(ERROR,
1021 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1022 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1023 : errhint("Use DROP DATABASE to drop invalid databases."));
1024 :
1025 : /*
1026 : * Permission check: to copy a DB that's not marked datistemplate, you
1027 : * must be superuser or the owner thereof.
1028 : */
1029 445 : if (!src_istemplate)
1030 : {
1031 14 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1032 0 : ereport(ERROR,
1033 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1034 : errmsg("permission denied to copy database \"%s\"",
1035 : dbtemplate)));
1036 : }
1037 :
1038 : /* Validate the database creation strategy. */
1039 445 : if (strategyEl && strategyEl->arg)
1040 : {
1041 : char *strategy;
1042 :
1043 164 : strategy = defGetString(strategyEl);
1044 164 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1045 11 : dbstrategy = CREATEDB_WAL_LOG;
1046 153 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1047 : {
1048 152 : if (DataChecksumsInProgressOn())
1049 0 : ereport(ERROR,
1050 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1051 : errmsg("create database strategy \"%s\" not allowed when data checksums are being enabled",
1052 : strategy));
1053 152 : dbstrategy = CREATEDB_FILE_COPY;
1054 : }
1055 : else
1056 1 : ereport(ERROR,
1057 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1058 : errmsg("invalid create database strategy \"%s\"", strategy),
1059 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1060 : }
1061 :
1062 : /* If encoding or locales are defaulted, use source's setting */
1063 444 : if (encoding < 0)
1064 383 : encoding = src_encoding;
1065 444 : if (dbcollate == NULL)
1066 376 : dbcollate = src_collate;
1067 444 : if (dbctype == NULL)
1068 376 : dbctype = src_ctype;
1069 444 : if (dblocprovider == '\0')
1070 383 : dblocprovider = src_locprovider;
1071 444 : if (dblocale == NULL && dblocprovider == src_locprovider)
1072 379 : dblocale = src_locale;
1073 444 : if (dbicurules == NULL)
1074 443 : dbicurules = src_icurules;
1075 :
1076 : /* Some encodings are client only */
1077 444 : if (!PG_VALID_BE_ENCODING(encoding))
1078 0 : ereport(ERROR,
1079 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1080 : errmsg("invalid server encoding %d", encoding)));
1081 :
1082 : /* Check that the chosen locales are valid, and get canonical spellings */
1083 444 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1084 : {
1085 1 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1086 0 : ereport(ERROR,
1087 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1088 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1089 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1090 1 : else if (dblocprovider == COLLPROVIDER_ICU)
1091 0 : ereport(ERROR,
1092 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1093 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1094 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1095 : else
1096 1 : ereport(ERROR,
1097 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1098 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
1099 : }
1100 443 : dbcollate = canonname;
1101 443 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1102 : {
1103 1 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1104 0 : ereport(ERROR,
1105 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1106 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1107 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1108 1 : else if (dblocprovider == COLLPROVIDER_ICU)
1109 0 : ereport(ERROR,
1110 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1111 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1112 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1113 : else
1114 1 : ereport(ERROR,
1115 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1116 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
1117 : }
1118 :
1119 442 : dbctype = canonname;
1120 :
1121 442 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1122 :
1123 : /* validate provider-specific parameters */
1124 442 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1125 : {
1126 409 : if (builtinlocaleEl)
1127 0 : ereport(ERROR,
1128 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1129 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1130 : }
1131 :
1132 442 : if (dblocprovider != COLLPROVIDER_ICU)
1133 : {
1134 427 : if (iculocaleEl)
1135 1 : ereport(ERROR,
1136 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1137 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1138 :
1139 426 : if (dbicurules)
1140 1 : ereport(ERROR,
1141 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1142 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1143 : }
1144 :
1145 : /* validate and canonicalize locale for the provider */
1146 440 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1147 : {
1148 : /*
1149 : * This would happen if template0 uses the libc provider but the new
1150 : * database uses builtin.
1151 : */
1152 31 : if (!dblocale)
1153 1 : ereport(ERROR,
1154 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1155 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1156 :
1157 30 : dblocale = builtin_validate_locale(encoding, dblocale);
1158 : }
1159 409 : else if (dblocprovider == COLLPROVIDER_ICU)
1160 : {
1161 15 : if (!(is_encoding_supported_by_icu(encoding)))
1162 1 : ereport(ERROR,
1163 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1164 : errmsg("encoding \"%s\" is not supported with ICU provider",
1165 : pg_encoding_to_char(encoding))));
1166 :
1167 : /*
1168 : * This would happen if template0 uses the libc provider but the new
1169 : * database uses icu.
1170 : */
1171 14 : if (!dblocale)
1172 1 : ereport(ERROR,
1173 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1174 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1175 :
1176 : /*
1177 : * During binary upgrade, or when the locale came from the template
1178 : * database, preserve locale string. Otherwise, canonicalize to a
1179 : * language tag.
1180 : */
1181 13 : if (!IsBinaryUpgrade && dblocale != src_locale)
1182 : {
1183 7 : char *langtag = icu_language_tag(dblocale,
1184 : icu_validation_level);
1185 :
1186 7 : if (langtag && strcmp(dblocale, langtag) != 0)
1187 : {
1188 3 : ereport(NOTICE,
1189 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1190 : langtag, dblocale)));
1191 :
1192 3 : dblocale = langtag;
1193 : }
1194 : }
1195 :
1196 13 : icu_validate_locale(dblocale);
1197 : }
1198 :
1199 : /* for libc, locale comes from datcollate and datctype */
1200 435 : if (dblocprovider == COLLPROVIDER_LIBC)
1201 394 : dblocale = NULL;
1202 :
1203 : /*
1204 : * Check that the new encoding and locale settings match the source
1205 : * database. We insist on this because we simply copy the source data ---
1206 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1207 : * according to the source locale would be wrong.
1208 : *
1209 : * However, we assume that template0 doesn't contain any non-ASCII data
1210 : * nor any indexes that depend on collation or ctype, so template0 can be
1211 : * used as template for creating a database with any encoding or locale.
1212 : */
1213 435 : if (strcmp(dbtemplate, "template0") != 0)
1214 : {
1215 264 : if (encoding != src_encoding)
1216 0 : ereport(ERROR,
1217 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1218 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1219 : pg_encoding_to_char(encoding),
1220 : pg_encoding_to_char(src_encoding)),
1221 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1222 :
1223 264 : if (strcmp(dbcollate, src_collate) != 0)
1224 1 : ereport(ERROR,
1225 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1226 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1227 : dbcollate, src_collate),
1228 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1229 :
1230 263 : if (strcmp(dbctype, src_ctype) != 0)
1231 0 : ereport(ERROR,
1232 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1233 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1234 : dbctype, src_ctype),
1235 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1236 :
1237 263 : if (dblocprovider != src_locprovider)
1238 0 : ereport(ERROR,
1239 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1240 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1241 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1242 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1243 :
1244 263 : if (dblocprovider == COLLPROVIDER_ICU)
1245 : {
1246 : char *val1;
1247 : char *val2;
1248 :
1249 : Assert(dblocale);
1250 : Assert(src_locale);
1251 6 : if (strcmp(dblocale, src_locale) != 0)
1252 0 : ereport(ERROR,
1253 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1254 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1255 : dblocale, src_locale),
1256 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1257 :
1258 6 : val1 = dbicurules;
1259 6 : if (!val1)
1260 6 : val1 = "";
1261 6 : val2 = src_icurules;
1262 6 : if (!val2)
1263 6 : val2 = "";
1264 6 : if (strcmp(val1, val2) != 0)
1265 0 : ereport(ERROR,
1266 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1267 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1268 : val1, val2),
1269 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1270 : }
1271 : }
1272 :
1273 : /*
1274 : * If we got a collation version for the template database, check that it
1275 : * matches the actual OS collation version. Otherwise error; the user
1276 : * needs to fix the template database first. Don't complain if a
1277 : * collation version was specified explicitly as a statement option; that
1278 : * is used by pg_upgrade to reproduce the old state exactly.
1279 : *
1280 : * (If the template database has no collation version, then either the
1281 : * platform/provider does not support collation versioning, or it's
1282 : * template0, for which we stipulate that it does not contain
1283 : * collation-using objects.)
1284 : */
1285 434 : if (src_collversion && !collversionEl)
1286 : {
1287 : char *actual_versionstr;
1288 : const char *locale;
1289 :
1290 179 : if (dblocprovider == COLLPROVIDER_LIBC)
1291 167 : locale = dbcollate;
1292 : else
1293 12 : locale = dblocale;
1294 :
1295 179 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1296 179 : if (!actual_versionstr)
1297 0 : ereport(ERROR,
1298 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1299 : dbtemplate)));
1300 :
1301 179 : if (strcmp(actual_versionstr, src_collversion) != 0)
1302 0 : ereport(ERROR,
1303 : (errmsg("template database \"%s\" has a collation version mismatch",
1304 : dbtemplate),
1305 : errdetail("The template database was created using collation version %s, "
1306 : "but the operating system provides version %s.",
1307 : src_collversion, actual_versionstr),
1308 : errhint("Rebuild all objects in the template database that use the default collation and run "
1309 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1310 : "or build PostgreSQL with the right library version.",
1311 : quote_identifier(dbtemplate))));
1312 : }
1313 :
1314 434 : if (dbcollversion == NULL)
1315 402 : dbcollversion = src_collversion;
1316 :
1317 : /*
1318 : * Normally, we copy the collation version from the template database.
1319 : * This last resort only applies if the template database does not have a
1320 : * collation version, which is normally only the case for template0.
1321 : */
1322 434 : if (dbcollversion == NULL)
1323 : {
1324 : const char *locale;
1325 :
1326 223 : if (dblocprovider == COLLPROVIDER_LIBC)
1327 201 : locale = dbcollate;
1328 : else
1329 22 : locale = dblocale;
1330 :
1331 223 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1332 : }
1333 :
1334 : /* Resolve default tablespace for new database */
1335 434 : if (tablespacenameEl && tablespacenameEl->arg)
1336 17 : {
1337 : char *tablespacename;
1338 : AclResult aclresult;
1339 :
1340 17 : tablespacename = defGetString(tablespacenameEl);
1341 17 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1342 : /* check permissions */
1343 17 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1344 : ACL_CREATE);
1345 17 : if (aclresult != ACLCHECK_OK)
1346 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1347 : tablespacename);
1348 :
1349 : /* pg_global must never be the default tablespace */
1350 17 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1351 0 : ereport(ERROR,
1352 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1353 : errmsg("pg_global cannot be used as default tablespace")));
1354 :
1355 : /*
1356 : * If we are trying to change the default tablespace of the template,
1357 : * we require that the template not have any files in the new default
1358 : * tablespace. This is necessary because otherwise the copied
1359 : * database would contain pg_class rows that refer to its default
1360 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1361 : * would cause problems. For example another CREATE DATABASE using
1362 : * the copied database as template, and trying to change its default
1363 : * tablespace again, would yield outright incorrect results (it would
1364 : * improperly move tables to the new default tablespace that should
1365 : * stay in the same tablespace).
1366 : */
1367 17 : if (dst_deftablespace != src_deftablespace)
1368 : {
1369 : char *srcpath;
1370 : struct stat st;
1371 :
1372 17 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1373 :
1374 17 : if (stat(srcpath, &st) == 0 &&
1375 0 : S_ISDIR(st.st_mode) &&
1376 0 : !directory_is_empty(srcpath))
1377 0 : ereport(ERROR,
1378 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1379 : errmsg("cannot assign new default tablespace \"%s\"",
1380 : tablespacename),
1381 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1382 : dbtemplate)));
1383 17 : pfree(srcpath);
1384 : }
1385 : }
1386 : else
1387 : {
1388 : /* Use template database's default tablespace */
1389 417 : dst_deftablespace = src_deftablespace;
1390 : /* Note there is no additional permission check in this path */
1391 : }
1392 :
1393 : /*
1394 : * If built with appropriate switch, whine when regression-testing
1395 : * conventions for database names are violated. But don't complain during
1396 : * initdb.
1397 : */
1398 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1399 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1400 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1401 : #endif
1402 :
1403 : /*
1404 : * Check for db name conflict. This is just to give a more friendly error
1405 : * message than "unique index violation". There's a race condition but
1406 : * we're willing to accept the less friendly message in that case.
1407 : */
1408 434 : if (OidIsValid(get_database_oid(dbname, true)))
1409 1 : ereport(ERROR,
1410 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1411 : errmsg("database \"%s\" already exists", dbname)));
1412 :
1413 : /*
1414 : * The source DB can't have any active backends, except this one
1415 : * (exception is to allow CREATE DB while connected to template1).
1416 : * Otherwise we might copy inconsistent data.
1417 : *
1418 : * This should be last among the basic error checks, because it involves
1419 : * potential waiting; we may as well throw an error first if we're gonna
1420 : * throw one.
1421 : */
1422 433 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1423 1 : ereport(ERROR,
1424 : (errcode(ERRCODE_OBJECT_IN_USE),
1425 : errmsg("source database \"%s\" is being accessed by other users",
1426 : dbtemplate),
1427 : errdetail_busy_db(notherbackends, npreparedxacts)));
1428 :
1429 : /*
1430 : * Select an OID for the new database, checking that it doesn't have a
1431 : * filename conflict with anything already existing in the tablespace
1432 : * directories.
1433 : */
1434 432 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1435 :
1436 : /*
1437 : * If database OID is configured, check if the OID is already in use or
1438 : * data directory already exists.
1439 : */
1440 432 : if (OidIsValid(dboid))
1441 : {
1442 147 : char *existing_dbname = get_database_name(dboid);
1443 :
1444 147 : if (existing_dbname != NULL)
1445 0 : ereport(ERROR,
1446 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1447 : errmsg("database OID %u is already in use by database \"%s\"",
1448 : dboid, existing_dbname));
1449 :
1450 147 : if (check_db_file_conflict(dboid))
1451 0 : ereport(ERROR,
1452 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1453 : errmsg("data directory with the specified OID %u already exists", dboid));
1454 : }
1455 : else
1456 : {
1457 : /* Select an OID for the new database if is not explicitly configured. */
1458 : do
1459 : {
1460 285 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1461 : Anum_pg_database_oid);
1462 285 : } while (check_db_file_conflict(dboid));
1463 : }
1464 :
1465 : /*
1466 : * Insert a new tuple into pg_database. This establishes our ownership of
1467 : * the new database name (anyone else trying to insert the same name will
1468 : * block on the unique index, and fail after we commit).
1469 : */
1470 :
1471 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1472 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1473 :
1474 : /* Form tuple */
1475 432 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1476 432 : new_record[Anum_pg_database_datname - 1] =
1477 432 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1478 432 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1479 432 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1480 432 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1481 432 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1482 432 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1483 432 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1484 432 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1485 432 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1486 432 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1487 432 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1488 432 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1489 432 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1490 432 : if (dblocale)
1491 40 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1492 : else
1493 392 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1494 432 : if (dbicurules)
1495 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1496 : else
1497 432 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1498 432 : if (dbcollversion)
1499 372 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1500 : else
1501 60 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1502 :
1503 : /*
1504 : * We deliberately set datacl to default (NULL), rather than copying it
1505 : * from the template database. Copying it would be a bad idea when the
1506 : * owner is not the same as the template's owner.
1507 : */
1508 432 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1509 :
1510 432 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1511 : new_record, new_record_nulls);
1512 :
1513 432 : CatalogTupleInsert(pg_database_rel, tuple);
1514 :
1515 : /*
1516 : * Now generate additional catalog entries associated with the new DB
1517 : */
1518 :
1519 : /* Register owner dependency */
1520 432 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1521 :
1522 : /* Create pg_shdepend entries for objects within database */
1523 432 : copyTemplateDependencies(src_dboid, dboid);
1524 :
1525 : /* Post creation hook for new database */
1526 432 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1527 :
1528 : /*
1529 : * If we're going to be reading data for the to-be-created database into
1530 : * shared_buffers, take a lock on it. Nobody should know that this
1531 : * database exists yet, but it's good to maintain the invariant that an
1532 : * AccessExclusiveLock on the database is sufficient to drop all of its
1533 : * buffers without worrying about more being read later.
1534 : *
1535 : * Note that we need to do this before entering the
1536 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1537 : * expects this lock to be held already.
1538 : */
1539 432 : if (dbstrategy == CREATEDB_WAL_LOG)
1540 280 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1541 :
1542 : /*
1543 : * Once we start copying subdirectories, we need to be able to clean 'em
1544 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1545 : * is not a 100% solution, because of the possibility of failure during
1546 : * transaction commit after we leave this routine, but it should handle
1547 : * most scenarios.)
1548 : */
1549 432 : fparms.src_dboid = src_dboid;
1550 432 : fparms.dest_dboid = dboid;
1551 432 : fparms.strategy = dbstrategy;
1552 :
1553 432 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1554 : PointerGetDatum(&fparms));
1555 : {
1556 : /*
1557 : * If the user has asked to create a database with WAL_LOG strategy
1558 : * then call CreateDatabaseUsingWalLog, which will copy the database
1559 : * at the block level and it will WAL log each copied block.
1560 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1561 : * database file by file.
1562 : */
1563 432 : if (dbstrategy == CREATEDB_WAL_LOG)
1564 280 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1565 : dst_deftablespace);
1566 : else
1567 152 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1568 : dst_deftablespace);
1569 :
1570 : /*
1571 : * Close pg_database, but keep lock till commit.
1572 : */
1573 430 : table_close(pg_database_rel, NoLock);
1574 :
1575 : /*
1576 : * Force synchronous commit, thus minimizing the window between
1577 : * creation of the database files and committal of the transaction. If
1578 : * we crash before committing, we'll have a DB that's taking up disk
1579 : * space but is not in pg_database, which is not good.
1580 : */
1581 430 : ForceSyncCommit();
1582 : }
1583 432 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1584 : PointerGetDatum(&fparms));
1585 :
1586 430 : return dboid;
1587 : }
1588 :
1589 : /*
1590 : * Check whether chosen encoding matches chosen locale settings. This
1591 : * restriction is necessary because libc's locale-specific code usually
1592 : * fails when presented with data in an encoding it's not expecting. We
1593 : * allow mismatch in four cases:
1594 : *
1595 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1596 : * which works with any encoding.
1597 : *
1598 : * 2. locale encoding = -1, which means that we couldn't determine the
1599 : * locale's encoding and have to trust the user to get it right.
1600 : *
1601 : * 3. selected encoding is UTF8 and platform is win32. This is because
1602 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1603 : * converted to UTF16 before being used.
1604 : *
1605 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1606 : * is risky but we have historically allowed it --- notably, the
1607 : * regression tests require it.
1608 : *
1609 : * Note: if you change this policy, fix initdb to match.
1610 : */
1611 : void
1612 458 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1613 : {
1614 458 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1615 458 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1616 :
1617 462 : if (!(ctype_encoding == encoding ||
1618 4 : ctype_encoding == PG_SQL_ASCII ||
1619 : ctype_encoding == -1 ||
1620 : #ifdef WIN32
1621 : encoding == PG_UTF8 ||
1622 : #endif
1623 4 : (encoding == PG_SQL_ASCII && superuser())))
1624 0 : ereport(ERROR,
1625 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1626 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1627 : pg_encoding_to_char(encoding),
1628 : ctype),
1629 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1630 : pg_encoding_to_char(ctype_encoding))));
1631 :
1632 462 : if (!(collate_encoding == encoding ||
1633 4 : collate_encoding == PG_SQL_ASCII ||
1634 : collate_encoding == -1 ||
1635 : #ifdef WIN32
1636 : encoding == PG_UTF8 ||
1637 : #endif
1638 4 : (encoding == PG_SQL_ASCII && superuser())))
1639 0 : ereport(ERROR,
1640 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1641 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1642 : pg_encoding_to_char(encoding),
1643 : collate),
1644 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1645 : pg_encoding_to_char(collate_encoding))));
1646 458 : }
1647 :
1648 : /* Error cleanup callback for createdb */
1649 : static void
1650 2 : createdb_failure_callback(int code, Datum arg)
1651 : {
1652 2 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1653 :
1654 : /*
1655 : * If we were copying database at block levels then drop pages for the
1656 : * destination database that are in the shared buffer cache. And tell
1657 : * checkpointer to forget any pending fsync and unlink requests for files
1658 : * in the database. The reasoning behind doing this is same as explained
1659 : * in dropdb function. But unlike dropdb we don't need to call
1660 : * pgstat_drop_database because this database is still not created so
1661 : * there should not be any stat for this.
1662 : */
1663 2 : if (fparms->strategy == CREATEDB_WAL_LOG)
1664 : {
1665 2 : DropDatabaseBuffers(fparms->dest_dboid);
1666 2 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1667 :
1668 : /* Release lock on the target database. */
1669 2 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1670 : AccessShareLock);
1671 : }
1672 :
1673 : /*
1674 : * Release lock on source database before doing recursive remove. This is
1675 : * not essential but it seems desirable to release the lock as soon as
1676 : * possible.
1677 : */
1678 2 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1679 :
1680 : /* Throw away any successfully copied subdirectories */
1681 2 : remove_dbtablespaces(fparms->dest_dboid);
1682 2 : }
1683 :
1684 :
1685 : /*
1686 : * DROP DATABASE
1687 : */
1688 : void
1689 82 : dropdb(const char *dbname, bool missing_ok, bool force)
1690 : {
1691 : Oid db_id;
1692 : bool db_istemplate;
1693 : Relation pgdbrel;
1694 : HeapTuple tup;
1695 : ScanKeyData scankey;
1696 : void *inplace_state;
1697 : Form_pg_database datform;
1698 : int notherbackends;
1699 : int npreparedxacts;
1700 : int nslots,
1701 : nslots_active;
1702 : int nsubscriptions;
1703 :
1704 : /*
1705 : * Look up the target database's OID, and get exclusive lock on it. We
1706 : * need this to ensure that no new backend starts up in the target
1707 : * database while we are deleting it (see postinit.c), and that no one is
1708 : * using it as a CREATE DATABASE template or trying to delete it for
1709 : * themselves.
1710 : */
1711 82 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1712 :
1713 82 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1714 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1715 : {
1716 21 : if (!missing_ok)
1717 : {
1718 10 : ereport(ERROR,
1719 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1720 : errmsg("database \"%s\" does not exist", dbname)));
1721 : }
1722 : else
1723 : {
1724 : /* Close pg_database, release the lock, since we changed nothing */
1725 11 : table_close(pgdbrel, RowExclusiveLock);
1726 11 : ereport(NOTICE,
1727 : (errmsg("database \"%s\" does not exist, skipping",
1728 : dbname)));
1729 11 : return;
1730 : }
1731 : }
1732 :
1733 : /*
1734 : * Permission checks
1735 : */
1736 61 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1737 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1738 : dbname);
1739 :
1740 : /* DROP hook for the database being removed */
1741 61 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1742 :
1743 : /*
1744 : * Disallow dropping a DB that is marked istemplate. This is just to
1745 : * prevent people from accidentally dropping template0 or template1; they
1746 : * can do so if they're really determined ...
1747 : */
1748 61 : if (db_istemplate)
1749 0 : ereport(ERROR,
1750 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1751 : errmsg("cannot drop a template database")));
1752 :
1753 : /* Obviously can't drop my own database */
1754 61 : if (db_id == MyDatabaseId)
1755 0 : ereport(ERROR,
1756 : (errcode(ERRCODE_OBJECT_IN_USE),
1757 : errmsg("cannot drop the currently open database")));
1758 :
1759 : /*
1760 : * Check whether there are active logical slots that refer to the
1761 : * to-be-dropped database. The database lock we are holding prevents the
1762 : * creation of new slots using the database or existing slots becoming
1763 : * active.
1764 : */
1765 61 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1766 61 : if (nslots_active)
1767 : {
1768 1 : ereport(ERROR,
1769 : (errcode(ERRCODE_OBJECT_IN_USE),
1770 : errmsg("database \"%s\" is used by an active logical replication slot",
1771 : dbname),
1772 : errdetail_plural("There is %d active slot.",
1773 : "There are %d active slots.",
1774 : nslots_active, nslots_active)));
1775 : }
1776 :
1777 : /*
1778 : * Check if there are subscriptions defined in the target database.
1779 : *
1780 : * We can't drop them automatically because they might be holding
1781 : * resources in other databases/instances.
1782 : */
1783 60 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1784 0 : ereport(ERROR,
1785 : (errcode(ERRCODE_OBJECT_IN_USE),
1786 : errmsg("database \"%s\" is being used by logical replication subscription",
1787 : dbname),
1788 : errdetail_plural("There is %d subscription.",
1789 : "There are %d subscriptions.",
1790 : nsubscriptions, nsubscriptions)));
1791 :
1792 :
1793 : /*
1794 : * Attempt to terminate all existing connections to the target database if
1795 : * the user has requested to do so.
1796 : */
1797 60 : if (force)
1798 1 : TerminateOtherDBBackends(db_id);
1799 :
1800 : /*
1801 : * Check for other backends in the target database. (Because we hold the
1802 : * database lock, no new ones can start after this.)
1803 : *
1804 : * As in CREATE DATABASE, check this after other error conditions.
1805 : */
1806 60 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1807 0 : ereport(ERROR,
1808 : (errcode(ERRCODE_OBJECT_IN_USE),
1809 : errmsg("database \"%s\" is being accessed by other users",
1810 : dbname),
1811 : errdetail_busy_db(notherbackends, npreparedxacts)));
1812 :
1813 : /*
1814 : * Delete any comments or security labels associated with the database.
1815 : */
1816 60 : DeleteSharedComments(db_id, DatabaseRelationId);
1817 60 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1818 :
1819 : /*
1820 : * Remove settings associated with this database
1821 : */
1822 60 : DropSetting(db_id, InvalidOid);
1823 :
1824 : /*
1825 : * Remove shared dependency references for the database.
1826 : */
1827 60 : dropDatabaseDependencies(db_id);
1828 :
1829 : /*
1830 : * Tell the cumulative stats system to forget it immediately, too.
1831 : */
1832 60 : pgstat_drop_database(db_id);
1833 :
1834 : /*
1835 : * Except for the deletion of the catalog row, subsequent actions are not
1836 : * transactional (consider DropDatabaseBuffers() discarding modified
1837 : * buffers). But we might crash or get interrupted below. To prevent
1838 : * accesses to a database with invalid contents, mark the database as
1839 : * invalid using an in-place update.
1840 : *
1841 : * We need to flush the WAL before continuing, to guarantee the
1842 : * modification is durable before performing irreversible filesystem
1843 : * operations.
1844 : */
1845 60 : ScanKeyInit(&scankey,
1846 : Anum_pg_database_datname,
1847 : BTEqualStrategyNumber, F_NAMEEQ,
1848 : CStringGetDatum(dbname));
1849 60 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1850 : NULL, 1, &scankey, &tup, &inplace_state);
1851 60 : if (!HeapTupleIsValid(tup))
1852 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1853 60 : datform = (Form_pg_database) GETSTRUCT(tup);
1854 60 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1855 60 : systable_inplace_update_finish(inplace_state, tup);
1856 60 : XLogFlush(XactLastRecEnd);
1857 :
1858 : /*
1859 : * Also delete the tuple - transactionally. If this transaction commits,
1860 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1861 : */
1862 60 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1863 60 : heap_freetuple(tup);
1864 :
1865 : /*
1866 : * Drop db-specific replication slots.
1867 : */
1868 60 : ReplicationSlotsDropDBSlots(db_id);
1869 :
1870 : /*
1871 : * Drop pages for this database that are in the shared buffer cache. This
1872 : * is important to ensure that no remaining backend tries to write out a
1873 : * dirty buffer to the dead database later...
1874 : */
1875 60 : DropDatabaseBuffers(db_id);
1876 :
1877 : /*
1878 : * Tell checkpointer to forget any pending fsync and unlink requests for
1879 : * files in the database; else the fsyncs will fail at next checkpoint, or
1880 : * worse, it will delete files that belong to a newly created database
1881 : * with the same OID.
1882 : */
1883 60 : ForgetDatabaseSyncRequests(db_id);
1884 :
1885 : /*
1886 : * Force a checkpoint to make sure the checkpointer has received the
1887 : * message sent by ForgetDatabaseSyncRequests.
1888 : */
1889 60 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1890 :
1891 : /* Close all smgr fds in all backends. */
1892 60 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1893 :
1894 : /*
1895 : * Remove all tablespace subdirs belonging to the database.
1896 : */
1897 60 : remove_dbtablespaces(db_id);
1898 :
1899 : /*
1900 : * Close pg_database, but keep lock till commit.
1901 : */
1902 59 : table_close(pgdbrel, NoLock);
1903 :
1904 : /*
1905 : * Force synchronous commit, thus minimizing the window between removal of
1906 : * the database files and committal of the transaction. If we crash before
1907 : * committing, we'll have a DB that's gone on disk but still there
1908 : * according to pg_database, which is not good.
1909 : */
1910 59 : ForceSyncCommit();
1911 : }
1912 :
1913 :
1914 : /*
1915 : * Rename database
1916 : */
1917 : ObjectAddress
1918 9 : RenameDatabase(const char *oldname, const char *newname)
1919 : {
1920 : Oid db_id;
1921 : HeapTuple newtup;
1922 : ItemPointerData otid;
1923 : Relation rel;
1924 : int notherbackends;
1925 : int npreparedxacts;
1926 : ObjectAddress address;
1927 :
1928 : /* Report error if name has \n or \r character. */
1929 9 : if (strpbrk(newname, "\n\r"))
1930 0 : ereport(ERROR,
1931 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1932 : errmsg("database name \"%s\" contains a newline or carriage return character", newname)));
1933 :
1934 : /*
1935 : * Look up the target database's OID, and get exclusive lock on it. We
1936 : * need this for the same reasons as DROP DATABASE.
1937 : */
1938 9 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1939 :
1940 9 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1941 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1942 0 : ereport(ERROR,
1943 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1944 : errmsg("database \"%s\" does not exist", oldname)));
1945 :
1946 : /* must be owner */
1947 9 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1948 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1949 : oldname);
1950 :
1951 : /* must have createdb rights */
1952 9 : if (!have_createdb_privilege())
1953 0 : ereport(ERROR,
1954 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1955 : errmsg("permission denied to rename database")));
1956 :
1957 : /*
1958 : * If built with appropriate switch, whine when regression-testing
1959 : * conventions for database names are violated.
1960 : */
1961 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1962 : if (strstr(newname, "regression") == NULL)
1963 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1964 : #endif
1965 :
1966 : /*
1967 : * Make sure the new name doesn't exist. See notes for same error in
1968 : * CREATE DATABASE.
1969 : */
1970 9 : if (OidIsValid(get_database_oid(newname, true)))
1971 0 : ereport(ERROR,
1972 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1973 : errmsg("database \"%s\" already exists", newname)));
1974 :
1975 : /*
1976 : * XXX Client applications probably store the current database somewhere,
1977 : * so renaming it could cause confusion. On the other hand, there may not
1978 : * be an actual problem besides a little confusion, so think about this
1979 : * and decide.
1980 : */
1981 9 : if (db_id == MyDatabaseId)
1982 0 : ereport(ERROR,
1983 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1984 : errmsg("current database cannot be renamed")));
1985 :
1986 : /*
1987 : * Make sure the database does not have active sessions. This is the same
1988 : * concern as above, but applied to other sessions.
1989 : *
1990 : * As in CREATE DATABASE, check this after other error conditions.
1991 : */
1992 9 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1993 0 : ereport(ERROR,
1994 : (errcode(ERRCODE_OBJECT_IN_USE),
1995 : errmsg("database \"%s\" is being accessed by other users",
1996 : oldname),
1997 : errdetail_busy_db(notherbackends, npreparedxacts)));
1998 :
1999 : /* rename */
2000 9 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
2001 9 : if (!HeapTupleIsValid(newtup))
2002 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
2003 9 : otid = newtup->t_self;
2004 9 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
2005 9 : CatalogTupleUpdate(rel, &otid, newtup);
2006 9 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
2007 :
2008 9 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2009 :
2010 9 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2011 :
2012 : /*
2013 : * Close pg_database, but keep lock till commit.
2014 : */
2015 9 : table_close(rel, NoLock);
2016 :
2017 9 : return address;
2018 : }
2019 :
2020 :
2021 : /*
2022 : * ALTER DATABASE SET TABLESPACE
2023 : */
2024 : static void
2025 14 : movedb(const char *dbname, const char *tblspcname)
2026 : {
2027 : Oid db_id;
2028 : Relation pgdbrel;
2029 : int notherbackends;
2030 : int npreparedxacts;
2031 : HeapTuple oldtuple,
2032 : newtuple;
2033 : Oid src_tblspcoid,
2034 : dst_tblspcoid;
2035 : ScanKeyData scankey;
2036 : SysScanDesc sysscan;
2037 : AclResult aclresult;
2038 : char *src_dbpath;
2039 : char *dst_dbpath;
2040 : DIR *dstdir;
2041 : struct dirent *xlde;
2042 : movedb_failure_params fparms;
2043 :
2044 : /*
2045 : * Look up the target database's OID, and get exclusive lock on it. We
2046 : * need this to ensure that no new backend starts up in the database while
2047 : * we are moving it, and that no one is using it as a CREATE DATABASE
2048 : * template or trying to delete it.
2049 : */
2050 14 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2051 :
2052 14 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2053 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2054 0 : ereport(ERROR,
2055 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2056 : errmsg("database \"%s\" does not exist", dbname)));
2057 :
2058 : /*
2059 : * We actually need a session lock, so that the lock will persist across
2060 : * the commit/restart below. (We could almost get away with letting the
2061 : * lock be released at commit, except that someone could try to move
2062 : * relations of the DB back into the old directory while we rmtree() it.)
2063 : */
2064 14 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2065 : AccessExclusiveLock);
2066 :
2067 : /*
2068 : * Permission checks
2069 : */
2070 14 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2071 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2072 : dbname);
2073 :
2074 : /*
2075 : * Obviously can't move the tables of my own database
2076 : */
2077 14 : if (db_id == MyDatabaseId)
2078 0 : ereport(ERROR,
2079 : (errcode(ERRCODE_OBJECT_IN_USE),
2080 : errmsg("cannot change the tablespace of the currently open database")));
2081 :
2082 : /*
2083 : * Get tablespace's oid
2084 : */
2085 14 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2086 :
2087 : /*
2088 : * Permission checks
2089 : */
2090 14 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2091 : ACL_CREATE);
2092 14 : if (aclresult != ACLCHECK_OK)
2093 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2094 : tblspcname);
2095 :
2096 : /*
2097 : * pg_global must never be the default tablespace
2098 : */
2099 14 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2100 0 : ereport(ERROR,
2101 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2102 : errmsg("pg_global cannot be used as default tablespace")));
2103 :
2104 : /*
2105 : * No-op if same tablespace
2106 : */
2107 14 : if (src_tblspcoid == dst_tblspcoid)
2108 : {
2109 0 : table_close(pgdbrel, NoLock);
2110 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2111 : AccessExclusiveLock);
2112 0 : return;
2113 : }
2114 :
2115 : /*
2116 : * Check for other backends in the target database. (Because we hold the
2117 : * database lock, no new ones can start after this.)
2118 : *
2119 : * As in CREATE DATABASE, check this after other error conditions.
2120 : */
2121 14 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2122 0 : ereport(ERROR,
2123 : (errcode(ERRCODE_OBJECT_IN_USE),
2124 : errmsg("database \"%s\" is being accessed by other users",
2125 : dbname),
2126 : errdetail_busy_db(notherbackends, npreparedxacts)));
2127 :
2128 : /*
2129 : * Get old and new database paths
2130 : */
2131 14 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2132 14 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2133 :
2134 : /*
2135 : * Force a checkpoint before proceeding. This will force all dirty
2136 : * buffers, including those of unlogged tables, out to disk, to ensure
2137 : * source database is up-to-date on disk for the copy.
2138 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2139 : * process any pending unlink requests. Otherwise, the check for existing
2140 : * files in the target directory might fail unnecessarily, not to mention
2141 : * that the copy might fail due to source files getting deleted under it.
2142 : * On Windows, this also ensures that background procs don't hold any open
2143 : * files, which would cause rmdir() to fail.
2144 : */
2145 14 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2146 : | CHECKPOINT_FLUSH_UNLOGGED);
2147 :
2148 : /* Close all smgr fds in all backends. */
2149 14 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2150 :
2151 : /*
2152 : * Now drop all buffers holding data of the target database; they should
2153 : * no longer be dirty so DropDatabaseBuffers is safe.
2154 : *
2155 : * It might seem that we could just let these buffers age out of shared
2156 : * buffers naturally, since they should not get referenced anymore. The
2157 : * problem with that is that if the user later moves the database back to
2158 : * its original tablespace, any still-surviving buffers would appear to
2159 : * contain valid data again --- but they'd be missing any changes made in
2160 : * the database while it was in the new tablespace. In any case, freeing
2161 : * buffers that should never be used again seems worth the cycles.
2162 : *
2163 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2164 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2165 : */
2166 14 : DropDatabaseBuffers(db_id);
2167 :
2168 : /*
2169 : * Check for existence of files in the target directory, i.e., objects of
2170 : * this database that are already in the target tablespace. We can't
2171 : * allow the move in such a case, because we would need to change those
2172 : * relations' pg_class.reltablespace entries to zero, and we don't have
2173 : * access to the DB's pg_class to do so.
2174 : */
2175 14 : dstdir = AllocateDir(dst_dbpath);
2176 14 : if (dstdir != NULL)
2177 : {
2178 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2179 : {
2180 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2181 0 : strcmp(xlde->d_name, "..") == 0)
2182 0 : continue;
2183 :
2184 0 : ereport(ERROR,
2185 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2186 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2187 : dbname, tblspcname),
2188 : errhint("You must move them back to the database's default tablespace before using this command.")));
2189 : }
2190 :
2191 0 : FreeDir(dstdir);
2192 :
2193 : /*
2194 : * The directory exists but is empty. We must remove it before using
2195 : * the copydir function.
2196 : */
2197 0 : if (rmdir(dst_dbpath) != 0)
2198 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2199 : dst_dbpath);
2200 : }
2201 :
2202 : /*
2203 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2204 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2205 : * of the possibility of failure during transaction commit, but it should
2206 : * handle most scenarios.
2207 : */
2208 14 : fparms.dest_dboid = db_id;
2209 14 : fparms.dest_tsoid = dst_tblspcoid;
2210 14 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2211 : PointerGetDatum(&fparms));
2212 : {
2213 14 : Datum new_record[Natts_pg_database] = {0};
2214 14 : bool new_record_nulls[Natts_pg_database] = {0};
2215 14 : bool new_record_repl[Natts_pg_database] = {0};
2216 :
2217 : /*
2218 : * Copy files from the old tablespace to the new one
2219 : */
2220 14 : copydir(src_dbpath, dst_dbpath, false);
2221 :
2222 : /*
2223 : * Record the filesystem change in XLOG
2224 : */
2225 : {
2226 : xl_dbase_create_file_copy_rec xlrec;
2227 :
2228 14 : xlrec.db_id = db_id;
2229 14 : xlrec.tablespace_id = dst_tblspcoid;
2230 14 : xlrec.src_db_id = db_id;
2231 14 : xlrec.src_tablespace_id = src_tblspcoid;
2232 :
2233 14 : XLogBeginInsert();
2234 14 : XLogRegisterData(&xlrec,
2235 : sizeof(xl_dbase_create_file_copy_rec));
2236 :
2237 14 : (void) XLogInsert(RM_DBASE_ID,
2238 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2239 : }
2240 :
2241 : /*
2242 : * Update the database's pg_database tuple
2243 : */
2244 14 : ScanKeyInit(&scankey,
2245 : Anum_pg_database_datname,
2246 : BTEqualStrategyNumber, F_NAMEEQ,
2247 : CStringGetDatum(dbname));
2248 14 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2249 : NULL, 1, &scankey);
2250 14 : oldtuple = systable_getnext(sysscan);
2251 14 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2252 0 : ereport(ERROR,
2253 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2254 : errmsg("database \"%s\" does not exist", dbname)));
2255 14 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2256 :
2257 14 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2258 14 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2259 :
2260 14 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2261 : new_record,
2262 : new_record_nulls, new_record_repl);
2263 14 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2264 14 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2265 :
2266 14 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2267 :
2268 14 : systable_endscan(sysscan);
2269 :
2270 : /*
2271 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2272 : * ensure that we don't have to replay a committed
2273 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2274 : * any unlogged operations done in the new DB tablespace before the
2275 : * next checkpoint.
2276 : */
2277 14 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2278 :
2279 : /*
2280 : * Force synchronous commit, thus minimizing the window between
2281 : * copying the database files and committal of the transaction. If we
2282 : * crash before committing, we'll leave an orphaned set of files on
2283 : * disk, which is not fatal but not good either.
2284 : */
2285 14 : ForceSyncCommit();
2286 :
2287 : /*
2288 : * Close pg_database, but keep lock till commit.
2289 : */
2290 14 : table_close(pgdbrel, NoLock);
2291 : }
2292 14 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2293 : PointerGetDatum(&fparms));
2294 :
2295 : /*
2296 : * Commit the transaction so that the pg_database update is committed. If
2297 : * we crash while removing files, the database won't be corrupt, we'll
2298 : * just leave some orphaned files in the old directory.
2299 : *
2300 : * (This is OK because we know we aren't inside a transaction block.)
2301 : *
2302 : * XXX would it be safe/better to do this inside the ensure block? Not
2303 : * convinced it's a good idea; consider elog just after the transaction
2304 : * really commits.
2305 : */
2306 14 : PopActiveSnapshot();
2307 14 : CommitTransactionCommand();
2308 :
2309 : /* Start new transaction for the remaining work; don't need a snapshot */
2310 14 : StartTransactionCommand();
2311 :
2312 : /*
2313 : * Remove files from the old tablespace
2314 : */
2315 14 : if (!rmtree(src_dbpath, true))
2316 0 : ereport(WARNING,
2317 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2318 : src_dbpath)));
2319 :
2320 : /*
2321 : * Record the filesystem change in XLOG
2322 : */
2323 : {
2324 : xl_dbase_drop_rec xlrec;
2325 :
2326 14 : xlrec.db_id = db_id;
2327 14 : xlrec.ntablespaces = 1;
2328 :
2329 14 : XLogBeginInsert();
2330 14 : XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2331 14 : XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2332 :
2333 14 : (void) XLogInsert(RM_DBASE_ID,
2334 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2335 : }
2336 :
2337 : /* Now it's safe to release the database lock */
2338 14 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2339 : AccessExclusiveLock);
2340 :
2341 14 : pfree(src_dbpath);
2342 14 : pfree(dst_dbpath);
2343 : }
2344 :
2345 : /* Error cleanup callback for movedb */
2346 : static void
2347 0 : movedb_failure_callback(int code, Datum arg)
2348 : {
2349 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2350 : char *dstpath;
2351 :
2352 : /* Get rid of anything we managed to copy to the target directory */
2353 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2354 :
2355 0 : (void) rmtree(dstpath, true);
2356 :
2357 0 : pfree(dstpath);
2358 0 : }
2359 :
2360 : /*
2361 : * Process options and call dropdb function.
2362 : */
2363 : void
2364 82 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2365 : {
2366 82 : bool force = false;
2367 : ListCell *lc;
2368 :
2369 99 : foreach(lc, stmt->options)
2370 : {
2371 17 : DefElem *opt = (DefElem *) lfirst(lc);
2372 :
2373 17 : if (strcmp(opt->defname, "force") == 0)
2374 17 : force = true;
2375 : else
2376 0 : ereport(ERROR,
2377 : (errcode(ERRCODE_SYNTAX_ERROR),
2378 : errmsg("unrecognized %s option \"%s\"",
2379 : "DROP DATABASE", opt->defname),
2380 : parser_errposition(pstate, opt->location)));
2381 : }
2382 :
2383 82 : dropdb(stmt->dbname, stmt->missing_ok, force);
2384 70 : }
2385 :
2386 : /*
2387 : * ALTER DATABASE name ...
2388 : */
2389 : Oid
2390 56 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2391 : {
2392 : Relation rel;
2393 : Oid dboid;
2394 : HeapTuple tuple,
2395 : newtuple;
2396 : Form_pg_database datform;
2397 : ScanKeyData scankey;
2398 : SysScanDesc scan;
2399 : ListCell *option;
2400 56 : bool dbistemplate = false;
2401 56 : bool dballowconnections = true;
2402 56 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2403 56 : DefElem *distemplate = NULL;
2404 56 : DefElem *dallowconnections = NULL;
2405 56 : DefElem *dconnlimit = NULL;
2406 56 : DefElem *dtablespace = NULL;
2407 56 : Datum new_record[Natts_pg_database] = {0};
2408 56 : bool new_record_nulls[Natts_pg_database] = {0};
2409 56 : bool new_record_repl[Natts_pg_database] = {0};
2410 :
2411 : /* Extract options from the statement node tree */
2412 112 : foreach(option, stmt->options)
2413 : {
2414 56 : DefElem *defel = (DefElem *) lfirst(option);
2415 :
2416 56 : if (strcmp(defel->defname, "is_template") == 0)
2417 : {
2418 12 : if (distemplate)
2419 0 : errorConflictingDefElem(defel, pstate);
2420 12 : distemplate = defel;
2421 : }
2422 44 : else if (strcmp(defel->defname, "allow_connections") == 0)
2423 : {
2424 21 : if (dallowconnections)
2425 0 : errorConflictingDefElem(defel, pstate);
2426 21 : dallowconnections = defel;
2427 : }
2428 23 : else if (strcmp(defel->defname, "connection_limit") == 0)
2429 : {
2430 9 : if (dconnlimit)
2431 0 : errorConflictingDefElem(defel, pstate);
2432 9 : dconnlimit = defel;
2433 : }
2434 14 : else if (strcmp(defel->defname, "tablespace") == 0)
2435 : {
2436 14 : if (dtablespace)
2437 0 : errorConflictingDefElem(defel, pstate);
2438 14 : dtablespace = defel;
2439 : }
2440 : else
2441 0 : ereport(ERROR,
2442 : (errcode(ERRCODE_SYNTAX_ERROR),
2443 : errmsg("option \"%s\" not recognized", defel->defname),
2444 : parser_errposition(pstate, defel->location)));
2445 : }
2446 :
2447 56 : if (dtablespace)
2448 : {
2449 : /*
2450 : * While the SET TABLESPACE syntax doesn't allow any other options,
2451 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2452 : * options from being specified in that case.
2453 : */
2454 14 : if (list_length(stmt->options) != 1)
2455 0 : ereport(ERROR,
2456 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2457 : errmsg("option \"%s\" cannot be specified with other options",
2458 : dtablespace->defname),
2459 : parser_errposition(pstate, dtablespace->location)));
2460 : /* this case isn't allowed within a transaction block */
2461 14 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2462 14 : movedb(stmt->dbname, defGetString(dtablespace));
2463 14 : return InvalidOid;
2464 : }
2465 :
2466 42 : if (distemplate && distemplate->arg)
2467 12 : dbistemplate = defGetBoolean(distemplate);
2468 42 : if (dallowconnections && dallowconnections->arg)
2469 21 : dballowconnections = defGetBoolean(dallowconnections);
2470 42 : if (dconnlimit && dconnlimit->arg)
2471 : {
2472 9 : dbconnlimit = defGetInt32(dconnlimit);
2473 9 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2474 0 : ereport(ERROR,
2475 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2476 : errmsg("invalid connection limit: %d", dbconnlimit)));
2477 : }
2478 :
2479 : /*
2480 : * Get the old tuple. We don't need a lock on the database per se,
2481 : * because we're not going to do anything that would mess up incoming
2482 : * connections.
2483 : */
2484 42 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2485 42 : ScanKeyInit(&scankey,
2486 : Anum_pg_database_datname,
2487 : BTEqualStrategyNumber, F_NAMEEQ,
2488 42 : CStringGetDatum(stmt->dbname));
2489 42 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2490 : NULL, 1, &scankey);
2491 42 : tuple = systable_getnext(scan);
2492 42 : if (!HeapTupleIsValid(tuple))
2493 0 : ereport(ERROR,
2494 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2495 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2496 42 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2497 :
2498 42 : datform = (Form_pg_database) GETSTRUCT(tuple);
2499 42 : dboid = datform->oid;
2500 :
2501 42 : if (database_is_invalid_form(datform))
2502 : {
2503 1 : ereport(FATAL,
2504 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2505 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2506 : errhint("Use DROP DATABASE to drop invalid databases."));
2507 : }
2508 :
2509 41 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2510 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2511 0 : stmt->dbname);
2512 :
2513 : /*
2514 : * In order to avoid getting locked out and having to go through
2515 : * standalone mode, we refuse to disallow connections to the database
2516 : * we're currently connected to. Lockout can still happen with concurrent
2517 : * sessions but the likeliness of that is not high enough to worry about.
2518 : */
2519 41 : if (!dballowconnections && dboid == MyDatabaseId)
2520 0 : ereport(ERROR,
2521 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2522 : errmsg("cannot disallow connections for current database")));
2523 :
2524 : /*
2525 : * Build an updated tuple, perusing the information just obtained
2526 : */
2527 41 : if (distemplate)
2528 : {
2529 12 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2530 12 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2531 : }
2532 41 : if (dallowconnections)
2533 : {
2534 21 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2535 21 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2536 : }
2537 41 : if (dconnlimit)
2538 : {
2539 8 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2540 8 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2541 : }
2542 :
2543 41 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2544 : new_record_nulls, new_record_repl);
2545 41 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2546 41 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2547 :
2548 41 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2549 :
2550 41 : systable_endscan(scan);
2551 :
2552 : /* Close pg_database, but keep lock till commit */
2553 41 : table_close(rel, NoLock);
2554 :
2555 41 : return dboid;
2556 : }
2557 :
2558 :
2559 : /*
2560 : * ALTER DATABASE name REFRESH COLLATION VERSION
2561 : */
2562 : ObjectAddress
2563 4 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2564 : {
2565 : Relation rel;
2566 : ScanKeyData scankey;
2567 : SysScanDesc scan;
2568 : Oid db_id;
2569 : HeapTuple tuple;
2570 : Form_pg_database datForm;
2571 : ObjectAddress address;
2572 : Datum datum;
2573 : bool isnull;
2574 : char *oldversion;
2575 : char *newversion;
2576 :
2577 4 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2578 4 : ScanKeyInit(&scankey,
2579 : Anum_pg_database_datname,
2580 : BTEqualStrategyNumber, F_NAMEEQ,
2581 4 : CStringGetDatum(stmt->dbname));
2582 4 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2583 : NULL, 1, &scankey);
2584 4 : tuple = systable_getnext(scan);
2585 4 : if (!HeapTupleIsValid(tuple))
2586 0 : ereport(ERROR,
2587 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2588 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2589 :
2590 4 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2591 4 : db_id = datForm->oid;
2592 :
2593 4 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2594 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2595 0 : stmt->dbname);
2596 4 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2597 :
2598 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2599 4 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2600 :
2601 4 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2602 : {
2603 3 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2604 3 : if (isnull)
2605 0 : elog(ERROR, "unexpected null in pg_database");
2606 : }
2607 : else
2608 : {
2609 1 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2610 1 : if (isnull)
2611 0 : elog(ERROR, "unexpected null in pg_database");
2612 : }
2613 :
2614 4 : newversion = get_collation_actual_version(datForm->datlocprovider,
2615 4 : TextDatumGetCString(datum));
2616 :
2617 : /* cannot change from NULL to non-NULL or vice versa */
2618 4 : if ((!oldversion && newversion) || (oldversion && !newversion))
2619 0 : elog(ERROR, "invalid collation version change");
2620 4 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2621 0 : {
2622 0 : bool nulls[Natts_pg_database] = {0};
2623 0 : bool replaces[Natts_pg_database] = {0};
2624 0 : Datum values[Natts_pg_database] = {0};
2625 : HeapTuple newtuple;
2626 :
2627 0 : ereport(NOTICE,
2628 : (errmsg("changing version from %s to %s",
2629 : oldversion, newversion)));
2630 :
2631 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2632 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2633 :
2634 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2635 : values, nulls, replaces);
2636 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2637 0 : heap_freetuple(newtuple);
2638 : }
2639 : else
2640 4 : ereport(NOTICE,
2641 : (errmsg("version has not changed")));
2642 4 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2643 :
2644 4 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2645 :
2646 4 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2647 :
2648 4 : systable_endscan(scan);
2649 :
2650 4 : table_close(rel, NoLock);
2651 :
2652 4 : return address;
2653 : }
2654 :
2655 :
2656 : /*
2657 : * ALTER DATABASE name SET ...
2658 : */
2659 : Oid
2660 667 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2661 : {
2662 667 : Oid datid = get_database_oid(stmt->dbname, false);
2663 :
2664 : /*
2665 : * Obtain a lock on the database and make sure it didn't go away in the
2666 : * meantime.
2667 : */
2668 667 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2669 :
2670 667 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2671 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2672 0 : stmt->dbname);
2673 :
2674 667 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2675 :
2676 665 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2677 :
2678 665 : return datid;
2679 : }
2680 :
2681 :
2682 : /*
2683 : * ALTER DATABASE name OWNER TO newowner
2684 : */
2685 : ObjectAddress
2686 49 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2687 : {
2688 : Oid db_id;
2689 : HeapTuple tuple;
2690 : Relation rel;
2691 : ScanKeyData scankey;
2692 : SysScanDesc scan;
2693 : Form_pg_database datForm;
2694 : ObjectAddress address;
2695 :
2696 : /*
2697 : * Get the old tuple. We don't need a lock on the database per se,
2698 : * because we're not going to do anything that would mess up incoming
2699 : * connections.
2700 : */
2701 49 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2702 49 : ScanKeyInit(&scankey,
2703 : Anum_pg_database_datname,
2704 : BTEqualStrategyNumber, F_NAMEEQ,
2705 : CStringGetDatum(dbname));
2706 49 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2707 : NULL, 1, &scankey);
2708 49 : tuple = systable_getnext(scan);
2709 49 : if (!HeapTupleIsValid(tuple))
2710 0 : ereport(ERROR,
2711 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2712 : errmsg("database \"%s\" does not exist", dbname)));
2713 :
2714 49 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2715 49 : db_id = datForm->oid;
2716 :
2717 : /*
2718 : * If the new owner is the same as the existing owner, consider the
2719 : * command to have succeeded. This is to be consistent with other
2720 : * objects.
2721 : */
2722 49 : if (datForm->datdba != newOwnerId)
2723 : {
2724 : Datum repl_val[Natts_pg_database];
2725 17 : bool repl_null[Natts_pg_database] = {0};
2726 17 : bool repl_repl[Natts_pg_database] = {0};
2727 : Acl *newAcl;
2728 : Datum aclDatum;
2729 : bool isNull;
2730 : HeapTuple newtuple;
2731 :
2732 : /* Otherwise, must be owner of the existing object */
2733 17 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2734 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2735 : dbname);
2736 :
2737 : /* Must be able to become new owner */
2738 17 : check_can_set_role(GetUserId(), newOwnerId);
2739 :
2740 : /*
2741 : * must have createdb rights
2742 : *
2743 : * NOTE: This is different from other alter-owner checks in that the
2744 : * current user is checked for createdb privileges instead of the
2745 : * destination owner. This is consistent with the CREATE case for
2746 : * databases. Because superusers will always have this right, we need
2747 : * no special case for them.
2748 : */
2749 17 : if (!have_createdb_privilege())
2750 0 : ereport(ERROR,
2751 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2752 : errmsg("permission denied to change owner of database")));
2753 :
2754 17 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2755 :
2756 17 : repl_repl[Anum_pg_database_datdba - 1] = true;
2757 17 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2758 :
2759 : /*
2760 : * Determine the modified ACL for the new owner. This is only
2761 : * necessary when the ACL is non-null.
2762 : */
2763 17 : aclDatum = heap_getattr(tuple,
2764 : Anum_pg_database_datacl,
2765 : RelationGetDescr(rel),
2766 : &isNull);
2767 17 : if (!isNull)
2768 : {
2769 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2770 : datForm->datdba, newOwnerId);
2771 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2772 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2773 : }
2774 :
2775 17 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2776 17 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2777 17 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2778 :
2779 17 : heap_freetuple(newtuple);
2780 :
2781 : /* Update owner dependency reference */
2782 17 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2783 : }
2784 :
2785 49 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2786 :
2787 49 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2788 :
2789 49 : systable_endscan(scan);
2790 :
2791 : /* Close pg_database, but keep lock till commit */
2792 49 : table_close(rel, NoLock);
2793 :
2794 49 : return address;
2795 : }
2796 :
2797 :
2798 : Datum
2799 55 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2800 : {
2801 55 : Oid dbid = PG_GETARG_OID(0);
2802 : HeapTuple tp;
2803 : char datlocprovider;
2804 : Datum datum;
2805 : char *version;
2806 :
2807 55 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2808 55 : if (!HeapTupleIsValid(tp))
2809 0 : ereport(ERROR,
2810 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2811 : errmsg("database with OID %u does not exist", dbid)));
2812 :
2813 55 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2814 :
2815 55 : if (datlocprovider == COLLPROVIDER_LIBC)
2816 48 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2817 : else
2818 7 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2819 :
2820 55 : version = get_collation_actual_version(datlocprovider,
2821 55 : TextDatumGetCString(datum));
2822 :
2823 55 : ReleaseSysCache(tp);
2824 :
2825 55 : if (version)
2826 41 : PG_RETURN_TEXT_P(cstring_to_text(version));
2827 : else
2828 14 : PG_RETURN_NULL();
2829 : }
2830 :
2831 :
2832 : /*
2833 : * Helper functions
2834 : */
2835 :
2836 : /*
2837 : * Look up info about the database named "name". If the database exists,
2838 : * obtain the specified lock type on it, fill in any of the remaining
2839 : * parameters that aren't NULL, and return true. If no such database,
2840 : * return false.
2841 : */
2842 : static bool
2843 551 : get_db_info(const char *name, LOCKMODE lockmode,
2844 : Oid *dbIdP, Oid *ownerIdP,
2845 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2846 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2847 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2848 : char **dbIcurules,
2849 : char *dbLocProvider,
2850 : char **dbCollversion)
2851 : {
2852 551 : bool result = false;
2853 : Relation relation;
2854 :
2855 : Assert(name);
2856 :
2857 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2858 551 : relation = table_open(DatabaseRelationId, AccessShareLock);
2859 :
2860 : /*
2861 : * Loop covers the rare case where the database is renamed before we can
2862 : * lock it. We try again just in case we can find a new one of the same
2863 : * name.
2864 : */
2865 : for (;;)
2866 0 : {
2867 : ScanKeyData scanKey;
2868 : SysScanDesc scan;
2869 : HeapTuple tuple;
2870 : Oid dbOid;
2871 :
2872 : /*
2873 : * there's no syscache for database-indexed-by-name, so must do it the
2874 : * hard way
2875 : */
2876 551 : ScanKeyInit(&scanKey,
2877 : Anum_pg_database_datname,
2878 : BTEqualStrategyNumber, F_NAMEEQ,
2879 : CStringGetDatum(name));
2880 :
2881 551 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2882 : NULL, 1, &scanKey);
2883 :
2884 551 : tuple = systable_getnext(scan);
2885 :
2886 551 : if (!HeapTupleIsValid(tuple))
2887 : {
2888 : /* definitely no database of that name */
2889 21 : systable_endscan(scan);
2890 21 : break;
2891 : }
2892 :
2893 530 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2894 :
2895 530 : systable_endscan(scan);
2896 :
2897 : /*
2898 : * Now that we have a database OID, we can try to lock the DB.
2899 : */
2900 530 : if (lockmode != NoLock)
2901 530 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2902 :
2903 : /*
2904 : * And now, re-fetch the tuple by OID. If it's still there and still
2905 : * the same name, we win; else, drop the lock and loop back to try
2906 : * again.
2907 : */
2908 530 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2909 530 : if (HeapTupleIsValid(tuple))
2910 : {
2911 530 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2912 :
2913 530 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2914 : {
2915 : Datum datum;
2916 : bool isnull;
2917 :
2918 : /* oid of the database */
2919 530 : if (dbIdP)
2920 530 : *dbIdP = dbOid;
2921 : /* oid of the owner */
2922 530 : if (ownerIdP)
2923 446 : *ownerIdP = dbform->datdba;
2924 : /* character encoding */
2925 530 : if (encodingP)
2926 446 : *encodingP = dbform->encoding;
2927 : /* allowed as template? */
2928 530 : if (dbIsTemplateP)
2929 507 : *dbIsTemplateP = dbform->datistemplate;
2930 : /* Has on login event trigger? */
2931 530 : if (dbHasLoginEvtP)
2932 446 : *dbHasLoginEvtP = dbform->dathasloginevt;
2933 : /* allowing connections? */
2934 530 : if (dbAllowConnP)
2935 446 : *dbAllowConnP = dbform->datallowconn;
2936 : /* limit of frozen XIDs */
2937 530 : if (dbFrozenXidP)
2938 446 : *dbFrozenXidP = dbform->datfrozenxid;
2939 : /* minimum MultiXactId */
2940 530 : if (dbMinMultiP)
2941 446 : *dbMinMultiP = dbform->datminmxid;
2942 : /* default tablespace for this database */
2943 530 : if (dbTablespace)
2944 460 : *dbTablespace = dbform->dattablespace;
2945 : /* default locale settings for this database */
2946 530 : if (dbLocProvider)
2947 446 : *dbLocProvider = dbform->datlocprovider;
2948 530 : if (dbCollate)
2949 : {
2950 446 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2951 446 : *dbCollate = TextDatumGetCString(datum);
2952 : }
2953 530 : if (dbCtype)
2954 : {
2955 446 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2956 446 : *dbCtype = TextDatumGetCString(datum);
2957 : }
2958 530 : if (dbLocale)
2959 : {
2960 446 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2961 446 : if (isnull)
2962 415 : *dbLocale = NULL;
2963 : else
2964 31 : *dbLocale = TextDatumGetCString(datum);
2965 : }
2966 530 : if (dbIcurules)
2967 : {
2968 446 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2969 446 : if (isnull)
2970 446 : *dbIcurules = NULL;
2971 : else
2972 0 : *dbIcurules = TextDatumGetCString(datum);
2973 : }
2974 530 : if (dbCollversion)
2975 : {
2976 446 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2977 446 : if (isnull)
2978 262 : *dbCollversion = NULL;
2979 : else
2980 184 : *dbCollversion = TextDatumGetCString(datum);
2981 : }
2982 530 : ReleaseSysCache(tuple);
2983 530 : result = true;
2984 530 : break;
2985 : }
2986 : /* can only get here if it was just renamed */
2987 0 : ReleaseSysCache(tuple);
2988 : }
2989 :
2990 0 : if (lockmode != NoLock)
2991 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2992 : }
2993 :
2994 551 : table_close(relation, AccessShareLock);
2995 :
2996 551 : return result;
2997 : }
2998 :
2999 : /* Check if current user has createdb privileges */
3000 : bool
3001 496 : have_createdb_privilege(void)
3002 : {
3003 496 : bool result = false;
3004 : HeapTuple utup;
3005 :
3006 : /* Superusers can always do everything */
3007 496 : if (superuser())
3008 472 : return true;
3009 :
3010 24 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
3011 24 : if (HeapTupleIsValid(utup))
3012 : {
3013 24 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
3014 24 : ReleaseSysCache(utup);
3015 : }
3016 24 : return result;
3017 : }
3018 :
3019 : /*
3020 : * Remove tablespace directories
3021 : *
3022 : * We don't know what tablespaces db_id is using, so iterate through all
3023 : * tablespaces removing <tablespace>/db_id
3024 : */
3025 : static void
3026 62 : remove_dbtablespaces(Oid db_id)
3027 : {
3028 : Relation rel;
3029 : TableScanDesc scan;
3030 : HeapTuple tuple;
3031 62 : List *ltblspc = NIL;
3032 : ListCell *cell;
3033 : int ntblspc;
3034 : int i;
3035 : Oid *tablespace_ids;
3036 :
3037 62 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3038 61 : scan = table_beginscan_catalog(rel, 0, NULL);
3039 226 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3040 : {
3041 165 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3042 165 : Oid dsttablespace = spcform->oid;
3043 : char *dstpath;
3044 : struct stat st;
3045 :
3046 : /* Don't mess with the global tablespace */
3047 165 : if (dsttablespace == GLOBALTABLESPACE_OID)
3048 104 : continue;
3049 :
3050 104 : dstpath = GetDatabasePath(db_id, dsttablespace);
3051 :
3052 104 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3053 : {
3054 : /* Assume we can ignore it */
3055 43 : pfree(dstpath);
3056 43 : continue;
3057 : }
3058 :
3059 61 : if (!rmtree(dstpath, true))
3060 0 : ereport(WARNING,
3061 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3062 : dstpath)));
3063 :
3064 61 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3065 61 : pfree(dstpath);
3066 : }
3067 :
3068 61 : ntblspc = list_length(ltblspc);
3069 61 : if (ntblspc == 0)
3070 : {
3071 0 : table_endscan(scan);
3072 0 : table_close(rel, AccessShareLock);
3073 0 : return;
3074 : }
3075 :
3076 61 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3077 61 : i = 0;
3078 122 : foreach(cell, ltblspc)
3079 61 : tablespace_ids[i++] = lfirst_oid(cell);
3080 :
3081 : /* Record the filesystem change in XLOG */
3082 : {
3083 : xl_dbase_drop_rec xlrec;
3084 :
3085 61 : xlrec.db_id = db_id;
3086 61 : xlrec.ntablespaces = ntblspc;
3087 :
3088 61 : XLogBeginInsert();
3089 61 : XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3090 61 : XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3091 :
3092 61 : (void) XLogInsert(RM_DBASE_ID,
3093 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3094 : }
3095 :
3096 61 : list_free(ltblspc);
3097 61 : pfree(tablespace_ids);
3098 :
3099 61 : table_endscan(scan);
3100 61 : table_close(rel, AccessShareLock);
3101 : }
3102 :
3103 : /*
3104 : * Check for existing files that conflict with a proposed new DB OID;
3105 : * return true if there are any
3106 : *
3107 : * If there were a subdirectory in any tablespace matching the proposed new
3108 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3109 : * try to remove that already-existing subdirectory during the cleanup in
3110 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3111 : * instead we make this extra check before settling on the OID of the new
3112 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3113 : * relfilenumber values.
3114 : */
3115 : static bool
3116 432 : check_db_file_conflict(Oid db_id)
3117 : {
3118 432 : bool result = false;
3119 : Relation rel;
3120 : TableScanDesc scan;
3121 : HeapTuple tuple;
3122 :
3123 432 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3124 432 : scan = table_beginscan_catalog(rel, 0, NULL);
3125 1381 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3126 : {
3127 949 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3128 949 : Oid dsttablespace = spcform->oid;
3129 : char *dstpath;
3130 : struct stat st;
3131 :
3132 : /* Don't mess with the global tablespace */
3133 949 : if (dsttablespace == GLOBALTABLESPACE_OID)
3134 432 : continue;
3135 :
3136 517 : dstpath = GetDatabasePath(db_id, dsttablespace);
3137 :
3138 517 : if (lstat(dstpath, &st) == 0)
3139 : {
3140 : /* Found a conflicting file (or directory, whatever) */
3141 0 : pfree(dstpath);
3142 0 : result = true;
3143 0 : break;
3144 : }
3145 :
3146 517 : pfree(dstpath);
3147 : }
3148 :
3149 432 : table_endscan(scan);
3150 432 : table_close(rel, AccessShareLock);
3151 :
3152 432 : return result;
3153 : }
3154 :
3155 : /*
3156 : * Issue a suitable errdetail message for a busy database
3157 : */
3158 : static int
3159 1 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3160 : {
3161 1 : if (notherbackends > 0 && npreparedxacts > 0)
3162 :
3163 : /*
3164 : * We don't deal with singular versus plural here, since gettext
3165 : * doesn't support multiple plurals in one string.
3166 : */
3167 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3168 : notherbackends, npreparedxacts);
3169 1 : else if (notherbackends > 0)
3170 1 : errdetail_plural("There is %d other session using the database.",
3171 : "There are %d other sessions using the database.",
3172 : notherbackends,
3173 : notherbackends);
3174 : else
3175 0 : errdetail_plural("There is %d prepared transaction using the database.",
3176 : "There are %d prepared transactions using the database.",
3177 : npreparedxacts,
3178 : npreparedxacts);
3179 1 : return 0; /* just to keep ereport macro happy */
3180 : }
3181 :
3182 : /*
3183 : * get_database_oid - given a database name, look up the OID
3184 : *
3185 : * If missing_ok is false, throw an error if database name not found. If
3186 : * true, just return InvalidOid.
3187 : */
3188 : Oid
3189 1754 : get_database_oid(const char *dbname, bool missing_ok)
3190 : {
3191 : Relation pg_database;
3192 : ScanKeyData entry[1];
3193 : SysScanDesc scan;
3194 : HeapTuple dbtuple;
3195 : Oid oid;
3196 :
3197 : /*
3198 : * There's no syscache for pg_database indexed by name, so we must look
3199 : * the hard way.
3200 : */
3201 1754 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3202 1754 : ScanKeyInit(&entry[0],
3203 : Anum_pg_database_datname,
3204 : BTEqualStrategyNumber, F_NAMEEQ,
3205 : CStringGetDatum(dbname));
3206 1754 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3207 : NULL, 1, entry);
3208 :
3209 1754 : dbtuple = systable_getnext(scan);
3210 :
3211 : /* We assume that there can be at most one matching tuple */
3212 1754 : if (HeapTupleIsValid(dbtuple))
3213 1284 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3214 : else
3215 470 : oid = InvalidOid;
3216 :
3217 1754 : systable_endscan(scan);
3218 1754 : table_close(pg_database, AccessShareLock);
3219 :
3220 1754 : if (!OidIsValid(oid) && !missing_ok)
3221 4 : ereport(ERROR,
3222 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3223 : errmsg("database \"%s\" does not exist",
3224 : dbname)));
3225 :
3226 1750 : return oid;
3227 : }
3228 :
3229 :
3230 : /*
3231 : * While dropping a database the pg_database row is marked invalid, but the
3232 : * catalog contents still exist. Connections to such a database are not
3233 : * allowed.
3234 : */
3235 : bool
3236 28416 : database_is_invalid_form(Form_pg_database datform)
3237 : {
3238 28416 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3239 : }
3240 :
3241 :
3242 : /*
3243 : * Convenience wrapper around database_is_invalid_form()
3244 : */
3245 : bool
3246 446 : database_is_invalid_oid(Oid dboid)
3247 : {
3248 : HeapTuple dbtup;
3249 : Form_pg_database dbform;
3250 : bool invalid;
3251 :
3252 446 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3253 446 : if (!HeapTupleIsValid(dbtup))
3254 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3255 446 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3256 :
3257 446 : invalid = database_is_invalid_form(dbform);
3258 :
3259 446 : ReleaseSysCache(dbtup);
3260 :
3261 446 : return invalid;
3262 : }
3263 :
3264 :
3265 : /*
3266 : * recovery_create_dbdir()
3267 : *
3268 : * During recovery, there's a case where we validly need to recover a missing
3269 : * tablespace directory so that recovery can continue. This happens when
3270 : * recovery wants to create a database but the holding tablespace has been
3271 : * removed before the server stopped. Since we expect that the directory will
3272 : * be gone before reaching recovery consistency, and we have no knowledge about
3273 : * the tablespace other than its OID here, we create a real directory under
3274 : * pg_tblspc here instead of restoring the symlink.
3275 : *
3276 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3277 : */
3278 : static void
3279 25 : recovery_create_dbdir(char *path, bool only_tblspc)
3280 : {
3281 : struct stat st;
3282 :
3283 : Assert(RecoveryInProgress());
3284 :
3285 25 : if (stat(path, &st) == 0)
3286 25 : return;
3287 :
3288 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3289 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3290 :
3291 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3292 0 : ereport(PANIC,
3293 : errmsg("missing directory \"%s\"", path));
3294 :
3295 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3296 : "creating missing directory: %s", path);
3297 :
3298 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3299 0 : ereport(PANIC,
3300 : errmsg("could not create missing directory \"%s\": %m", path));
3301 : }
3302 :
3303 :
3304 : /*
3305 : * DATABASE resource manager's routines
3306 : */
3307 : void
3308 46 : dbase_redo(XLogReaderState *record)
3309 : {
3310 46 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3311 :
3312 : /* Backup blocks are not used in dbase records */
3313 : Assert(!XLogRecHasAnyBlockRefs(record));
3314 :
3315 46 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3316 : {
3317 5 : xl_dbase_create_file_copy_rec *xlrec =
3318 5 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3319 : char *src_path;
3320 : char *dst_path;
3321 : char *parent_path;
3322 : struct stat st;
3323 :
3324 5 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3325 5 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3326 :
3327 : /*
3328 : * Our theory for replaying a CREATE is to forcibly drop the target
3329 : * subdirectory if present, then re-copy the source data. This may be
3330 : * more work than needed, but it is simple to implement.
3331 : */
3332 5 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3333 : {
3334 0 : if (!rmtree(dst_path, true))
3335 : /* If this failed, copydir() below is going to error. */
3336 0 : ereport(WARNING,
3337 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3338 : dst_path)));
3339 : }
3340 :
3341 : /*
3342 : * If the parent of the target path doesn't exist, create it now. This
3343 : * enables us to create the target underneath later.
3344 : */
3345 5 : parent_path = pstrdup(dst_path);
3346 5 : get_parent_directory(parent_path);
3347 5 : if (stat(parent_path, &st) < 0)
3348 : {
3349 0 : if (errno != ENOENT)
3350 0 : ereport(FATAL,
3351 : errmsg("could not stat directory \"%s\": %m",
3352 : dst_path));
3353 :
3354 : /* create the parent directory if needed and valid */
3355 0 : recovery_create_dbdir(parent_path, true);
3356 : }
3357 5 : pfree(parent_path);
3358 :
3359 : /*
3360 : * There's a case where the copy source directory is missing for the
3361 : * same reason above. Create the empty source directory so that
3362 : * copydir below doesn't fail. The directory will be dropped soon by
3363 : * recovery.
3364 : */
3365 5 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3366 0 : recovery_create_dbdir(src_path, false);
3367 :
3368 : /*
3369 : * Force dirty buffers out to disk, to ensure source database is
3370 : * up-to-date for the copy.
3371 : */
3372 5 : FlushDatabaseBuffers(xlrec->src_db_id);
3373 :
3374 : /* Close all smgr fds in all backends. */
3375 5 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3376 :
3377 : /*
3378 : * Copy this subdirectory to the new location
3379 : *
3380 : * We don't need to copy subdirectories
3381 : */
3382 5 : copydir(src_path, dst_path, false);
3383 :
3384 5 : pfree(src_path);
3385 5 : pfree(dst_path);
3386 : }
3387 41 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3388 : {
3389 25 : xl_dbase_create_wal_log_rec *xlrec =
3390 25 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3391 : char *dbpath;
3392 : char *parent_path;
3393 :
3394 25 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3395 :
3396 : /* create the parent directory if needed and valid */
3397 25 : parent_path = pstrdup(dbpath);
3398 25 : get_parent_directory(parent_path);
3399 25 : recovery_create_dbdir(parent_path, true);
3400 25 : pfree(parent_path);
3401 :
3402 : /* Create the database directory with the version file. */
3403 25 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3404 : true);
3405 25 : pfree(dbpath);
3406 : }
3407 16 : else if (info == XLOG_DBASE_DROP)
3408 : {
3409 16 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3410 : char *dst_path;
3411 : int i;
3412 :
3413 16 : if (InHotStandby)
3414 : {
3415 : /*
3416 : * Lock database while we resolve conflicts to ensure that
3417 : * InitPostgres() cannot fully re-execute concurrently. This
3418 : * avoids backends re-connecting automatically to same database,
3419 : * which can happen in some cases.
3420 : *
3421 : * This will lock out walsenders trying to connect to db-specific
3422 : * slots for logical decoding too, so it's safe for us to drop
3423 : * slots.
3424 : */
3425 16 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3426 16 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3427 : }
3428 :
3429 : /* Drop any database-specific replication slots */
3430 16 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3431 :
3432 : /* Drop pages for this database that are in the shared buffer cache */
3433 16 : DropDatabaseBuffers(xlrec->db_id);
3434 :
3435 : /* Also, clean out any fsync requests that might be pending in md.c */
3436 16 : ForgetDatabaseSyncRequests(xlrec->db_id);
3437 :
3438 : /* Clean out the xlog relcache too */
3439 16 : XLogDropDatabase(xlrec->db_id);
3440 :
3441 : /* Close all smgr fds in all backends. */
3442 16 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3443 :
3444 32 : for (i = 0; i < xlrec->ntablespaces; i++)
3445 : {
3446 16 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3447 :
3448 : /* And remove the physical files */
3449 16 : if (!rmtree(dst_path, true))
3450 0 : ereport(WARNING,
3451 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3452 : dst_path)));
3453 16 : pfree(dst_path);
3454 : }
3455 :
3456 16 : if (InHotStandby)
3457 : {
3458 : /*
3459 : * Release locks prior to commit. XXX There is a race condition
3460 : * here that may allow backends to reconnect, but the window for
3461 : * this is small because the gap between here and commit is mostly
3462 : * fairly small and it is unlikely that people will be dropping
3463 : * databases that we are trying to connect to anyway.
3464 : */
3465 16 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3466 : }
3467 : }
3468 : else
3469 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3470 46 : }
|