Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/procsignal.h"
64 : #include "storage/smgr.h"
65 : #include "utils/acl.h"
66 : #include "utils/builtins.h"
67 : #include "utils/fmgroids.h"
68 : #include "utils/lsyscache.h"
69 : #include "utils/pg_locale.h"
70 : #include "utils/relmapper.h"
71 : #include "utils/snapmgr.h"
72 : #include "utils/syscache.h"
73 :
74 : /*
75 : * Create database strategy.
76 : *
77 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
78 : * copied block.
79 : *
80 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
81 : * database and log a single record for each tablespace copied. To make this
82 : * safe, it also triggers checkpoints before and after the operation.
83 : */
84 : typedef enum CreateDBStrategy
85 : {
86 : CREATEDB_WAL_LOG,
87 : CREATEDB_FILE_COPY,
88 : } CreateDBStrategy;
89 :
90 : typedef struct
91 : {
92 : Oid src_dboid; /* source (template) DB */
93 : Oid dest_dboid; /* DB we are trying to create */
94 : CreateDBStrategy strategy; /* create db strategy */
95 : } createdb_failure_params;
96 :
97 : typedef struct
98 : {
99 : Oid dest_dboid; /* DB we are trying to move */
100 : Oid dest_tsoid; /* tablespace we are trying to move to */
101 : } movedb_failure_params;
102 :
103 : /*
104 : * Information about a relation to be copied when creating a database.
105 : */
106 : typedef struct CreateDBRelInfo
107 : {
108 : RelFileLocator rlocator; /* physical relation identifier */
109 : Oid reloid; /* relation oid */
110 : bool permanent; /* relation is permanent or unlogged */
111 : } CreateDBRelInfo;
112 :
113 :
114 : /* non-export function prototypes */
115 : static void createdb_failure_callback(int code, Datum arg);
116 : static void movedb(const char *dbname, const char *tblspcname);
117 : static void movedb_failure_callback(int code, Datum arg);
118 : static bool get_db_info(const char *name, LOCKMODE lockmode,
119 : Oid *dbIdP, Oid *ownerIdP,
120 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
121 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
122 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
123 : char **dbIcurules,
124 : char *dbLocProvider,
125 : char **dbCollversion);
126 : static void remove_dbtablespaces(Oid db_id);
127 : static bool check_db_file_conflict(Oid db_id);
128 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
129 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
130 : Oid dst_tsid);
131 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
132 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
133 : Oid dbid, char *srcpath,
134 : List *rlocatorlist, Snapshot snapshot);
135 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
136 : Oid tbid, Oid dbid,
137 : char *srcpath);
138 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
139 : bool isRedo);
140 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
141 : Oid src_tsid, Oid dst_tsid);
142 : static void recovery_create_dbdir(char *path, bool only_tblspc);
143 :
144 : /*
145 : * Create a new database using the WAL_LOG strategy.
146 : *
147 : * Each copied block is separately written to the write-ahead log.
148 : */
149 : static void
150 253 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
151 : Oid src_tsid, Oid dst_tsid)
152 : {
153 : char *srcpath;
154 : char *dstpath;
155 253 : List *rlocatorlist = NULL;
156 : ListCell *cell;
157 : LockRelId srcrelid;
158 : LockRelId dstrelid;
159 : RelFileLocator srcrlocator;
160 : RelFileLocator dstrlocator;
161 : CreateDBRelInfo *relinfo;
162 :
163 : /* Get source and destination database paths. */
164 253 : srcpath = GetDatabasePath(src_dboid, src_tsid);
165 253 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
166 :
167 : /* Create database directory and write PG_VERSION file. */
168 253 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
169 :
170 : /* Copy relmap file from source database to the destination database. */
171 253 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
172 :
173 : /* Get list of relfilelocators to copy from the source database. */
174 253 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
175 : Assert(rlocatorlist != NIL);
176 :
177 : /*
178 : * Database IDs will be the same for all relations so set them before
179 : * entering the loop.
180 : */
181 253 : srcrelid.dbId = src_dboid;
182 253 : dstrelid.dbId = dst_dboid;
183 :
184 : /* Loop over our list of relfilelocators and copy each one. */
185 56503 : foreach(cell, rlocatorlist)
186 : {
187 56252 : relinfo = lfirst(cell);
188 56252 : srcrlocator = relinfo->rlocator;
189 :
190 : /*
191 : * If the relation is from the source db's default tablespace then we
192 : * need to create it in the destination db's default tablespace.
193 : * Otherwise, we need to create in the same tablespace as it is in the
194 : * source database.
195 : */
196 56252 : if (srcrlocator.spcOid == src_tsid)
197 56252 : dstrlocator.spcOid = dst_tsid;
198 : else
199 0 : dstrlocator.spcOid = srcrlocator.spcOid;
200 :
201 56252 : dstrlocator.dbOid = dst_dboid;
202 56252 : dstrlocator.relNumber = srcrlocator.relNumber;
203 :
204 : /*
205 : * Acquire locks on source and target relations before copying.
206 : *
207 : * We typically do not read relation data into shared_buffers without
208 : * holding a relation lock. It's unclear what could go wrong if we
209 : * skipped it in this case, because nobody can be modifying either the
210 : * source or destination database at this point, and we have locks on
211 : * both databases, too, but let's take the conservative route.
212 : */
213 56252 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
214 56252 : LockRelationId(&srcrelid, AccessShareLock);
215 56252 : LockRelationId(&dstrelid, AccessShareLock);
216 :
217 : /* Copy relation storage from source to the destination. */
218 56252 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
219 :
220 : /* Release the relation locks. */
221 56250 : UnlockRelationId(&srcrelid, AccessShareLock);
222 56250 : UnlockRelationId(&dstrelid, AccessShareLock);
223 : }
224 :
225 251 : pfree(srcpath);
226 251 : pfree(dstpath);
227 251 : list_free_deep(rlocatorlist);
228 251 : }
229 :
230 : /*
231 : * Scan the pg_class table in the source database to identify the relations
232 : * that need to be copied to the destination database.
233 : *
234 : * This is an exception to the usual rule that cross-database access is
235 : * not possible. We can make it work here because we know that there are no
236 : * connections to the source database and (since there can't be prepared
237 : * transactions touching that database) no in-doubt tuples either. This
238 : * means that we don't need to worry about pruning removing anything from
239 : * under us, and we don't need to be too picky about our snapshot either.
240 : * As long as it sees all previously-committed XIDs as committed and all
241 : * aborted XIDs as aborted, we should be fine: nothing else is possible
242 : * here.
243 : *
244 : * We can't rely on the relcache for anything here, because that only knows
245 : * about the database to which we are connected, and can't handle access to
246 : * other databases. That also means we can't rely on the heap scan
247 : * infrastructure, which would be a bad idea anyway since it might try
248 : * to do things like HOT pruning which we definitely can't do safely in
249 : * a database to which we're not even connected.
250 : */
251 : static List *
252 253 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
253 : {
254 : RelFileLocator rlocator;
255 : BlockNumber nblocks;
256 : BlockNumber blkno;
257 : Buffer buf;
258 : RelFileNumber relfilenumber;
259 : Page page;
260 253 : List *rlocatorlist = NIL;
261 : LockRelId relid;
262 : Snapshot snapshot;
263 : SMgrRelation smgr;
264 : BufferAccessStrategy bstrategy;
265 :
266 : /* Get pg_class relfilenumber. */
267 253 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
268 : RelationRelationId);
269 :
270 : /* Don't read data into shared_buffers without holding a relation lock. */
271 253 : relid.dbId = dbid;
272 253 : relid.relId = RelationRelationId;
273 253 : LockRelationId(&relid, AccessShareLock);
274 :
275 : /* Prepare a RelFileLocator for the pg_class relation. */
276 253 : rlocator.spcOid = tbid;
277 253 : rlocator.dbOid = dbid;
278 253 : rlocator.relNumber = relfilenumber;
279 :
280 253 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
281 253 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
282 253 : smgrclose(smgr);
283 :
284 : /* Use a buffer access strategy since this is a bulk read operation. */
285 253 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
286 :
287 : /*
288 : * As explained in the function header comments, we need a snapshot that
289 : * will see all committed transactions as committed, and our transaction
290 : * snapshot - or the active snapshot - might not be new enough for that,
291 : * but the return value of GetLatestSnapshot() should work fine.
292 : */
293 253 : snapshot = RegisterSnapshot(GetLatestSnapshot());
294 :
295 : /* Process the relation block by block. */
296 3795 : for (blkno = 0; blkno < nblocks; blkno++)
297 : {
298 3542 : CHECK_FOR_INTERRUPTS();
299 :
300 3542 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
301 : RBM_NORMAL, bstrategy, true);
302 :
303 3542 : LockBuffer(buf, BUFFER_LOCK_SHARE);
304 3542 : page = BufferGetPage(buf);
305 3542 : if (PageIsNew(page) || PageIsEmpty(page))
306 : {
307 0 : UnlockReleaseBuffer(buf);
308 0 : continue;
309 : }
310 :
311 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
312 3542 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
313 : srcpath, rlocatorlist,
314 : snapshot);
315 :
316 3542 : UnlockReleaseBuffer(buf);
317 : }
318 253 : UnregisterSnapshot(snapshot);
319 :
320 : /* Release relation lock. */
321 253 : UnlockRelationId(&relid, AccessShareLock);
322 :
323 253 : return rlocatorlist;
324 : }
325 :
326 : /*
327 : * Scan one page of the source database's pg_class relation and add relevant
328 : * entries to rlocatorlist. The return value is the updated list.
329 : */
330 : static List *
331 3542 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
332 : char *srcpath, List *rlocatorlist,
333 : Snapshot snapshot)
334 : {
335 3542 : BlockNumber blkno = BufferGetBlockNumber(buf);
336 : OffsetNumber offnum;
337 : OffsetNumber maxoff;
338 : HeapTupleData tuple;
339 :
340 3542 : maxoff = PageGetMaxOffsetNumber(page);
341 :
342 : /* Loop over offsets. */
343 3542 : for (offnum = FirstOffsetNumber;
344 182503 : offnum <= maxoff;
345 178961 : offnum = OffsetNumberNext(offnum))
346 : {
347 : ItemId itemid;
348 :
349 178961 : itemid = PageGetItemId(page, offnum);
350 :
351 : /* Nothing to do if slot is empty or already dead. */
352 178961 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
353 128065 : ItemIdIsRedirected(itemid))
354 73413 : continue;
355 :
356 : Assert(ItemIdIsNormal(itemid));
357 105548 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
358 :
359 : /* Initialize a HeapTupleData structure. */
360 105548 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
361 105548 : tuple.t_len = ItemIdGetLength(itemid);
362 105548 : tuple.t_tableOid = RelationRelationId;
363 :
364 : /* Skip tuples that are not visible to this snapshot. */
365 105548 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
366 : {
367 : CreateDBRelInfo *relinfo;
368 :
369 : /*
370 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
371 : * CreateDBRelInfo object for this tuple, but can also decide that
372 : * this tuple isn't something we need to copy. If we do need to
373 : * copy the relation, add it to the list.
374 : */
375 105529 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
376 : srcpath);
377 105529 : if (relinfo != NULL)
378 56700 : rlocatorlist = lappend(rlocatorlist, relinfo);
379 : }
380 : }
381 :
382 3542 : return rlocatorlist;
383 : }
384 :
385 : /*
386 : * Decide whether a certain pg_class tuple represents something that
387 : * needs to be copied from the source database to the destination database,
388 : * and if so, construct a CreateDBRelInfo for it.
389 : *
390 : * Visibility checks are handled by the caller, so our job here is just
391 : * to assess the data stored in the tuple.
392 : */
393 : CreateDBRelInfo *
394 105529 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
395 : char *srcpath)
396 : {
397 : CreateDBRelInfo *relinfo;
398 : Form_pg_class classForm;
399 105529 : RelFileNumber relfilenumber = InvalidRelFileNumber;
400 :
401 105529 : classForm = (Form_pg_class) GETSTRUCT(tuple);
402 :
403 : /*
404 : * Return NULL if this object does not need to be copied.
405 : *
406 : * Shared objects don't need to be copied, because they are shared.
407 : * Objects without storage can't be copied, because there's nothing to
408 : * copy. Temporary relations don't need to be copied either, because they
409 : * are inaccessible outside of the session that created them, which must
410 : * be gone already, and couldn't connect to a different database if it
411 : * still existed. autovacuum will eventually remove the pg_class entries
412 : * as well.
413 : */
414 105529 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
415 93891 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
416 56700 : classForm->relpersistence == RELPERSISTENCE_TEMP)
417 48829 : return NULL;
418 :
419 : /*
420 : * If relfilenumber is valid then directly use it. Otherwise, consult the
421 : * relmap.
422 : */
423 56700 : if (RelFileNumberIsValid(classForm->relfilenode))
424 52399 : relfilenumber = classForm->relfilenode;
425 : else
426 4301 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
427 : classForm->oid);
428 :
429 : /* We must have a valid relfilenumber. */
430 56700 : if (!RelFileNumberIsValid(relfilenumber))
431 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
432 : classForm->oid);
433 :
434 : /* Prepare a rel info element and add it to the list. */
435 56700 : relinfo = palloc_object(CreateDBRelInfo);
436 56700 : if (OidIsValid(classForm->reltablespace))
437 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
438 : else
439 56700 : relinfo->rlocator.spcOid = tbid;
440 :
441 56700 : relinfo->rlocator.dbOid = dbid;
442 56700 : relinfo->rlocator.relNumber = relfilenumber;
443 56700 : relinfo->reloid = classForm->oid;
444 :
445 : /* Temporary relations were rejected above. */
446 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
447 56700 : relinfo->permanent =
448 56700 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
449 :
450 56700 : return relinfo;
451 : }
452 :
453 : /*
454 : * Create database directory and write out the PG_VERSION file in the database
455 : * path. If isRedo is true, it's okay for the database directory to exist
456 : * already.
457 : */
458 : static void
459 276 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
460 : {
461 : int fd;
462 : int nbytes;
463 : char versionfile[MAXPGPATH];
464 : char buf[16];
465 :
466 : /*
467 : * Note that we don't have to copy version data from the source database;
468 : * there's only one legal value.
469 : */
470 276 : sprintf(buf, "%s\n", PG_MAJORVERSION);
471 276 : nbytes = strlen(PG_MAJORVERSION) + 1;
472 :
473 : /* Create database directory. */
474 276 : if (MakePGDirectory(dbpath) < 0)
475 : {
476 : /* Failure other than already exists or not in WAL replay? */
477 8 : if (errno != EEXIST || !isRedo)
478 0 : ereport(ERROR,
479 : (errcode_for_file_access(),
480 : errmsg("could not create directory \"%s\": %m", dbpath)));
481 : }
482 :
483 : /*
484 : * Create PG_VERSION file in the database path. If the file already
485 : * exists and we are in WAL replay then try again to open it in write
486 : * mode.
487 : */
488 276 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
489 :
490 276 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
491 276 : if (fd < 0 && errno == EEXIST && isRedo)
492 8 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
493 :
494 276 : if (fd < 0)
495 0 : ereport(ERROR,
496 : (errcode_for_file_access(),
497 : errmsg("could not create file \"%s\": %m", versionfile)));
498 :
499 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
500 276 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
501 276 : errno = 0;
502 276 : if ((int) write(fd, buf, nbytes) != nbytes)
503 : {
504 : /* If write didn't set errno, assume problem is no disk space. */
505 0 : if (errno == 0)
506 0 : errno = ENOSPC;
507 0 : ereport(ERROR,
508 : (errcode_for_file_access(),
509 : errmsg("could not write to file \"%s\": %m", versionfile)));
510 : }
511 276 : pgstat_report_wait_end();
512 :
513 276 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
514 276 : if (pg_fsync(fd) != 0)
515 0 : ereport(data_sync_elevel(ERROR),
516 : (errcode_for_file_access(),
517 : errmsg("could not fsync file \"%s\": %m", versionfile)));
518 276 : fsync_fname(dbpath, true);
519 276 : pgstat_report_wait_end();
520 :
521 : /* Close the version file. */
522 276 : CloseTransientFile(fd);
523 :
524 : /* If we are not in WAL replay then write the WAL. */
525 276 : if (!isRedo)
526 : {
527 : xl_dbase_create_wal_log_rec xlrec;
528 :
529 253 : START_CRIT_SECTION();
530 :
531 253 : xlrec.db_id = dbid;
532 253 : xlrec.tablespace_id = tsid;
533 :
534 253 : XLogBeginInsert();
535 253 : XLogRegisterData(&xlrec,
536 : sizeof(xl_dbase_create_wal_log_rec));
537 :
538 253 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
539 :
540 253 : END_CRIT_SECTION();
541 : }
542 276 : }
543 :
544 : /*
545 : * Create a new database using the FILE_COPY strategy.
546 : *
547 : * Copy each tablespace at the filesystem level, and log a single WAL record
548 : * for each tablespace copied. This requires a checkpoint before and after the
549 : * copy, which may be expensive, but it does greatly reduce WAL generation
550 : * if the copied database is large.
551 : */
552 : static void
553 138 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
554 : Oid dst_tsid)
555 : {
556 : TableScanDesc scan;
557 : Relation rel;
558 : HeapTuple tuple;
559 :
560 : /*
561 : * Force a checkpoint before starting the copy. This will force all dirty
562 : * buffers, including those of unlogged tables, out to disk, to ensure
563 : * source database is up-to-date on disk for the copy.
564 : * FlushDatabaseBuffers() would suffice for that, but we also want to
565 : * process any pending unlink requests. Otherwise, if a checkpoint
566 : * happened while we're copying files, a file might be deleted just when
567 : * we're about to copy it, causing the lstat() call in copydir() to fail
568 : * with ENOENT.
569 : *
570 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
571 : * is careful to ensure that template0 is fully written to disk prior to
572 : * any CREATE DATABASE commands.
573 : */
574 138 : if (!IsBinaryUpgrade)
575 108 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
576 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
577 :
578 : /*
579 : * Iterate through all tablespaces of the template database, and copy each
580 : * one to the new database.
581 : */
582 138 : rel = table_open(TableSpaceRelationId, AccessShareLock);
583 138 : scan = table_beginscan_catalog(rel, 0, NULL);
584 448 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
585 : {
586 310 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
587 310 : Oid srctablespace = spaceform->oid;
588 : Oid dsttablespace;
589 : char *srcpath;
590 : char *dstpath;
591 : struct stat st;
592 :
593 : /* No need to copy global tablespace */
594 310 : if (srctablespace == GLOBALTABLESPACE_OID)
595 172 : continue;
596 :
597 172 : srcpath = GetDatabasePath(src_dboid, srctablespace);
598 :
599 310 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
600 138 : directory_is_empty(srcpath))
601 : {
602 : /* Assume we can ignore it */
603 34 : pfree(srcpath);
604 34 : continue;
605 : }
606 :
607 138 : if (srctablespace == src_tsid)
608 138 : dsttablespace = dst_tsid;
609 : else
610 0 : dsttablespace = srctablespace;
611 :
612 138 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
613 :
614 : /*
615 : * Copy this subdirectory to the new location
616 : *
617 : * We don't need to copy subdirectories
618 : */
619 138 : copydir(srcpath, dstpath, false);
620 :
621 : /* Record the filesystem change in XLOG */
622 : {
623 : xl_dbase_create_file_copy_rec xlrec;
624 :
625 138 : xlrec.db_id = dst_dboid;
626 138 : xlrec.tablespace_id = dsttablespace;
627 138 : xlrec.src_db_id = src_dboid;
628 138 : xlrec.src_tablespace_id = srctablespace;
629 :
630 138 : XLogBeginInsert();
631 138 : XLogRegisterData(&xlrec,
632 : sizeof(xl_dbase_create_file_copy_rec));
633 :
634 138 : (void) XLogInsert(RM_DBASE_ID,
635 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
636 : }
637 138 : pfree(srcpath);
638 138 : pfree(dstpath);
639 : }
640 138 : table_endscan(scan);
641 138 : table_close(rel, AccessShareLock);
642 :
643 : /*
644 : * We force a checkpoint before committing. This effectively means that
645 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
646 : * replayed (at least not in ordinary crash recovery; we still have to
647 : * make the XLOG entry for the benefit of PITR operations). This avoids
648 : * two nasty scenarios:
649 : *
650 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
651 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
652 : * of DBASE_CREATE replay would lose such files created in the new
653 : * database between our commit and the next checkpoint.
654 : *
655 : * #2: Since we have to recopy the source database during DBASE_CREATE
656 : * replay, we run the risk of copying changes in it that were committed
657 : * after the original CREATE DATABASE command but before the system crash
658 : * that led to the replay. This is at least unexpected and at worst could
659 : * lead to inconsistencies, eg duplicate table names.
660 : *
661 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
662 : *
663 : * In PITR replay, the first of these isn't an issue, and the second is
664 : * only a risk if the CREATE DATABASE and subsequent template database
665 : * change both occur while a base backup is being taken. There doesn't
666 : * seem to be much we can do about that except document it as a
667 : * limitation.
668 : *
669 : * In binary upgrade mode, we can skip this checkpoint because neither of
670 : * these problems applies: we don't ever replay the WAL generated during
671 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
672 : * (not to mention that we don't concurrently modify template0, either).
673 : *
674 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
675 : * strategy that avoids these problems.
676 : */
677 138 : if (!IsBinaryUpgrade)
678 108 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
679 : CHECKPOINT_WAIT);
680 138 : }
681 :
682 : /*
683 : * CREATE DATABASE
684 : */
685 : Oid
686 409 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
687 : {
688 : Oid src_dboid;
689 : Oid src_owner;
690 409 : int src_encoding = -1;
691 409 : char *src_collate = NULL;
692 409 : char *src_ctype = NULL;
693 409 : char *src_locale = NULL;
694 409 : char *src_icurules = NULL;
695 409 : char src_locprovider = '\0';
696 409 : char *src_collversion = NULL;
697 : bool src_istemplate;
698 409 : bool src_hasloginevt = false;
699 : bool src_allowconn;
700 409 : TransactionId src_frozenxid = InvalidTransactionId;
701 409 : MultiXactId src_minmxid = InvalidMultiXactId;
702 : Oid src_deftablespace;
703 : volatile Oid dst_deftablespace;
704 : Relation pg_database_rel;
705 : HeapTuple tuple;
706 409 : Datum new_record[Natts_pg_database] = {0};
707 409 : bool new_record_nulls[Natts_pg_database] = {0};
708 409 : Oid dboid = InvalidOid;
709 : Oid datdba;
710 : ListCell *option;
711 409 : DefElem *tablespacenameEl = NULL;
712 409 : DefElem *ownerEl = NULL;
713 409 : DefElem *templateEl = NULL;
714 409 : DefElem *encodingEl = NULL;
715 409 : DefElem *localeEl = NULL;
716 409 : DefElem *builtinlocaleEl = NULL;
717 409 : DefElem *collateEl = NULL;
718 409 : DefElem *ctypeEl = NULL;
719 409 : DefElem *iculocaleEl = NULL;
720 409 : DefElem *icurulesEl = NULL;
721 409 : DefElem *locproviderEl = NULL;
722 409 : DefElem *istemplateEl = NULL;
723 409 : DefElem *allowconnectionsEl = NULL;
724 409 : DefElem *connlimitEl = NULL;
725 409 : DefElem *collversionEl = NULL;
726 409 : DefElem *strategyEl = NULL;
727 409 : char *dbname = stmt->dbname;
728 409 : char *dbowner = NULL;
729 409 : const char *dbtemplate = NULL;
730 409 : char *dbcollate = NULL;
731 409 : char *dbctype = NULL;
732 409 : const char *dblocale = NULL;
733 409 : char *dbicurules = NULL;
734 409 : char dblocprovider = '\0';
735 : char *canonname;
736 409 : int encoding = -1;
737 409 : bool dbistemplate = false;
738 409 : bool dballowconnections = true;
739 409 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
740 409 : char *dbcollversion = NULL;
741 : int notherbackends;
742 : int npreparedxacts;
743 409 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
744 : createdb_failure_params fparms;
745 :
746 : /* Extract options from the statement node tree */
747 1222 : foreach(option, stmt->options)
748 : {
749 813 : DefElem *defel = (DefElem *) lfirst(option);
750 :
751 813 : if (strcmp(defel->defname, "tablespace") == 0)
752 : {
753 17 : if (tablespacenameEl)
754 0 : errorConflictingDefElem(defel, pstate);
755 17 : tablespacenameEl = defel;
756 : }
757 796 : else if (strcmp(defel->defname, "owner") == 0)
758 : {
759 1 : if (ownerEl)
760 0 : errorConflictingDefElem(defel, pstate);
761 1 : ownerEl = defel;
762 : }
763 795 : else if (strcmp(defel->defname, "template") == 0)
764 : {
765 185 : if (templateEl)
766 0 : errorConflictingDefElem(defel, pstate);
767 185 : templateEl = defel;
768 : }
769 610 : else if (strcmp(defel->defname, "encoding") == 0)
770 : {
771 53 : if (encodingEl)
772 0 : errorConflictingDefElem(defel, pstate);
773 53 : encodingEl = defel;
774 : }
775 557 : else if (strcmp(defel->defname, "locale") == 0)
776 : {
777 54 : if (localeEl)
778 0 : errorConflictingDefElem(defel, pstate);
779 54 : localeEl = defel;
780 : }
781 503 : else if (strcmp(defel->defname, "builtin_locale") == 0)
782 : {
783 8 : if (builtinlocaleEl)
784 0 : errorConflictingDefElem(defel, pstate);
785 8 : builtinlocaleEl = defel;
786 : }
787 495 : else if (strcmp(defel->defname, "lc_collate") == 0)
788 : {
789 8 : if (collateEl)
790 0 : errorConflictingDefElem(defel, pstate);
791 8 : collateEl = defel;
792 : }
793 487 : else if (strcmp(defel->defname, "lc_ctype") == 0)
794 : {
795 8 : if (ctypeEl)
796 0 : errorConflictingDefElem(defel, pstate);
797 8 : ctypeEl = defel;
798 : }
799 479 : else if (strcmp(defel->defname, "icu_locale") == 0)
800 : {
801 5 : if (iculocaleEl)
802 0 : errorConflictingDefElem(defel, pstate);
803 5 : iculocaleEl = defel;
804 : }
805 474 : else if (strcmp(defel->defname, "icu_rules") == 0)
806 : {
807 1 : if (icurulesEl)
808 0 : errorConflictingDefElem(defel, pstate);
809 1 : icurulesEl = defel;
810 : }
811 473 : else if (strcmp(defel->defname, "locale_provider") == 0)
812 : {
813 59 : if (locproviderEl)
814 0 : errorConflictingDefElem(defel, pstate);
815 59 : locproviderEl = defel;
816 : }
817 414 : else if (strcmp(defel->defname, "is_template") == 0)
818 : {
819 51 : if (istemplateEl)
820 0 : errorConflictingDefElem(defel, pstate);
821 51 : istemplateEl = defel;
822 : }
823 363 : else if (strcmp(defel->defname, "allow_connections") == 0)
824 : {
825 50 : if (allowconnectionsEl)
826 0 : errorConflictingDefElem(defel, pstate);
827 50 : allowconnectionsEl = defel;
828 : }
829 313 : else if (strcmp(defel->defname, "connection_limit") == 0)
830 : {
831 0 : if (connlimitEl)
832 0 : errorConflictingDefElem(defel, pstate);
833 0 : connlimitEl = defel;
834 : }
835 313 : else if (strcmp(defel->defname, "collation_version") == 0)
836 : {
837 30 : if (collversionEl)
838 0 : errorConflictingDefElem(defel, pstate);
839 30 : collversionEl = defel;
840 : }
841 283 : else if (strcmp(defel->defname, "location") == 0)
842 : {
843 0 : ereport(WARNING,
844 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
845 : errmsg("LOCATION is not supported anymore"),
846 : errhint("Consider using tablespaces instead."),
847 : parser_errposition(pstate, defel->location)));
848 : }
849 283 : else if (strcmp(defel->defname, "oid") == 0)
850 : {
851 133 : dboid = defGetObjectId(defel);
852 :
853 : /*
854 : * We don't normally permit new databases to be created with
855 : * system-assigned OIDs. pg_upgrade tries to preserve database
856 : * OIDs, so we can't allow any database to be created with an OID
857 : * that might be in use in a freshly-initialized cluster created
858 : * by some future version. We assume all such OIDs will be from
859 : * the system-managed OID range.
860 : *
861 : * As an exception, however, we permit any OID to be assigned when
862 : * allow_system_table_mods=on (so that initdb can assign system
863 : * OIDs to template0 and postgres) or when performing a binary
864 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
865 : * in the source cluster).
866 : */
867 133 : if (dboid < FirstNormalObjectId &&
868 116 : !allowSystemTableMods && !IsBinaryUpgrade)
869 0 : ereport(ERROR,
870 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
871 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
872 : }
873 150 : else if (strcmp(defel->defname, "strategy") == 0)
874 : {
875 150 : if (strategyEl)
876 0 : errorConflictingDefElem(defel, pstate);
877 150 : strategyEl = defel;
878 : }
879 : else
880 0 : ereport(ERROR,
881 : (errcode(ERRCODE_SYNTAX_ERROR),
882 : errmsg("option \"%s\" not recognized", defel->defname),
883 : parser_errposition(pstate, defel->location)));
884 : }
885 :
886 409 : if (ownerEl && ownerEl->arg)
887 1 : dbowner = defGetString(ownerEl);
888 409 : if (templateEl && templateEl->arg)
889 185 : dbtemplate = defGetString(templateEl);
890 409 : if (encodingEl && encodingEl->arg)
891 : {
892 : const char *encoding_name;
893 :
894 53 : if (IsA(encodingEl->arg, Integer))
895 : {
896 0 : encoding = defGetInt32(encodingEl);
897 0 : encoding_name = pg_encoding_to_char(encoding);
898 0 : if (strcmp(encoding_name, "") == 0 ||
899 0 : pg_valid_server_encoding(encoding_name) < 0)
900 0 : ereport(ERROR,
901 : (errcode(ERRCODE_UNDEFINED_OBJECT),
902 : errmsg("%d is not a valid encoding code",
903 : encoding),
904 : parser_errposition(pstate, encodingEl->location)));
905 : }
906 : else
907 : {
908 53 : encoding_name = defGetString(encodingEl);
909 53 : encoding = pg_valid_server_encoding(encoding_name);
910 53 : if (encoding < 0)
911 0 : ereport(ERROR,
912 : (errcode(ERRCODE_UNDEFINED_OBJECT),
913 : errmsg("%s is not a valid encoding name",
914 : encoding_name),
915 : parser_errposition(pstate, encodingEl->location)));
916 : }
917 : }
918 409 : if (localeEl && localeEl->arg)
919 : {
920 54 : dbcollate = defGetString(localeEl);
921 54 : dbctype = defGetString(localeEl);
922 54 : dblocale = defGetString(localeEl);
923 : }
924 409 : if (builtinlocaleEl && builtinlocaleEl->arg)
925 8 : dblocale = defGetString(builtinlocaleEl);
926 409 : if (collateEl && collateEl->arg)
927 8 : dbcollate = defGetString(collateEl);
928 409 : if (ctypeEl && ctypeEl->arg)
929 8 : dbctype = defGetString(ctypeEl);
930 409 : if (iculocaleEl && iculocaleEl->arg)
931 5 : dblocale = defGetString(iculocaleEl);
932 409 : if (icurulesEl && icurulesEl->arg)
933 1 : dbicurules = defGetString(icurulesEl);
934 409 : if (locproviderEl && locproviderEl->arg)
935 : {
936 59 : char *locproviderstr = defGetString(locproviderEl);
937 :
938 59 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
939 17 : dblocprovider = COLLPROVIDER_BUILTIN;
940 42 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
941 7 : dblocprovider = COLLPROVIDER_ICU;
942 35 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
943 34 : dblocprovider = COLLPROVIDER_LIBC;
944 : else
945 1 : ereport(ERROR,
946 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
947 : errmsg("unrecognized locale provider: %s",
948 : locproviderstr)));
949 : }
950 408 : if (istemplateEl && istemplateEl->arg)
951 51 : dbistemplate = defGetBoolean(istemplateEl);
952 408 : if (allowconnectionsEl && allowconnectionsEl->arg)
953 50 : dballowconnections = defGetBoolean(allowconnectionsEl);
954 408 : if (connlimitEl && connlimitEl->arg)
955 : {
956 0 : dbconnlimit = defGetInt32(connlimitEl);
957 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
958 0 : ereport(ERROR,
959 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
960 : errmsg("invalid connection limit: %d", dbconnlimit)));
961 : }
962 408 : if (collversionEl)
963 30 : dbcollversion = defGetString(collversionEl);
964 :
965 : /* obtain OID of proposed owner */
966 408 : if (dbowner)
967 1 : datdba = get_role_oid(dbowner, false);
968 : else
969 407 : datdba = GetUserId();
970 :
971 : /*
972 : * To create a database, must have createdb privilege and must be able to
973 : * become the target role (this does not imply that the target role itself
974 : * must have createdb privilege). The latter provision guards against
975 : * "giveaway" attacks. Note that a superuser will always have both of
976 : * these privileges a fortiori.
977 : */
978 408 : if (!have_createdb_privilege())
979 3 : ereport(ERROR,
980 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
981 : errmsg("permission denied to create database")));
982 :
983 405 : check_can_set_role(GetUserId(), datdba);
984 :
985 : /*
986 : * Lookup database (template) to be cloned, and obtain share lock on it.
987 : * ShareLock allows two CREATE DATABASEs to work from the same template
988 : * concurrently, while ensuring no one is busy dropping it in parallel
989 : * (which would be Very Bad since we'd likely get an incomplete copy
990 : * without knowing it). This also prevents any new connections from being
991 : * made to the source until we finish copying it, so we can be sure it
992 : * won't change underneath us.
993 : */
994 405 : if (!dbtemplate)
995 221 : dbtemplate = "template1"; /* Default template database name */
996 :
997 405 : if (!get_db_info(dbtemplate, ShareLock,
998 : &src_dboid, &src_owner, &src_encoding,
999 : &src_istemplate, &src_allowconn, &src_hasloginevt,
1000 : &src_frozenxid, &src_minmxid, &src_deftablespace,
1001 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1002 : &src_collversion))
1003 0 : ereport(ERROR,
1004 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1005 : errmsg("template database \"%s\" does not exist",
1006 : dbtemplate)));
1007 :
1008 : /*
1009 : * If the source database was in the process of being dropped, we can't
1010 : * use it as a template.
1011 : */
1012 405 : if (database_is_invalid_oid(src_dboid))
1013 1 : ereport(ERROR,
1014 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1015 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1016 : errhint("Use DROP DATABASE to drop invalid databases."));
1017 :
1018 : /*
1019 : * Permission check: to copy a DB that's not marked datistemplate, you
1020 : * must be superuser or the owner thereof.
1021 : */
1022 404 : if (!src_istemplate)
1023 : {
1024 14 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1025 0 : ereport(ERROR,
1026 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1027 : errmsg("permission denied to copy database \"%s\"",
1028 : dbtemplate)));
1029 : }
1030 :
1031 : /* Validate the database creation strategy. */
1032 404 : if (strategyEl && strategyEl->arg)
1033 : {
1034 : char *strategy;
1035 :
1036 150 : strategy = defGetString(strategyEl);
1037 150 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1038 11 : dbstrategy = CREATEDB_WAL_LOG;
1039 139 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1040 138 : dbstrategy = CREATEDB_FILE_COPY;
1041 : else
1042 1 : ereport(ERROR,
1043 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1044 : errmsg("invalid create database strategy \"%s\"", strategy),
1045 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1046 : }
1047 :
1048 : /* If encoding or locales are defaulted, use source's setting */
1049 403 : if (encoding < 0)
1050 350 : encoding = src_encoding;
1051 403 : if (dbcollate == NULL)
1052 343 : dbcollate = src_collate;
1053 403 : if (dbctype == NULL)
1054 343 : dbctype = src_ctype;
1055 403 : if (dblocprovider == '\0')
1056 345 : dblocprovider = src_locprovider;
1057 403 : if (dblocale == NULL && dblocprovider == src_locprovider)
1058 341 : dblocale = src_locale;
1059 403 : if (dbicurules == NULL)
1060 402 : dbicurules = src_icurules;
1061 :
1062 : /* Some encodings are client only */
1063 403 : if (!PG_VALID_BE_ENCODING(encoding))
1064 0 : ereport(ERROR,
1065 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1066 : errmsg("invalid server encoding %d", encoding)));
1067 :
1068 : /* Check that the chosen locales are valid, and get canonical spellings */
1069 403 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1070 : {
1071 1 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1072 0 : ereport(ERROR,
1073 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1074 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1075 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1076 1 : else if (dblocprovider == COLLPROVIDER_ICU)
1077 0 : ereport(ERROR,
1078 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1079 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1080 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1081 : else
1082 1 : ereport(ERROR,
1083 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1084 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
1085 : }
1086 402 : dbcollate = canonname;
1087 402 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1088 : {
1089 1 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1090 0 : ereport(ERROR,
1091 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1092 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1093 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1094 1 : else if (dblocprovider == COLLPROVIDER_ICU)
1095 0 : ereport(ERROR,
1096 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1097 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1098 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1099 : else
1100 1 : ereport(ERROR,
1101 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1102 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
1103 : }
1104 :
1105 401 : dbctype = canonname;
1106 :
1107 401 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1108 :
1109 : /* validate provider-specific parameters */
1110 401 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1111 : {
1112 370 : if (builtinlocaleEl)
1113 0 : ereport(ERROR,
1114 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1115 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1116 : }
1117 :
1118 401 : if (dblocprovider != COLLPROVIDER_ICU)
1119 : {
1120 387 : if (iculocaleEl)
1121 1 : ereport(ERROR,
1122 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1123 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1124 :
1125 386 : if (dbicurules)
1126 1 : ereport(ERROR,
1127 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1128 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1129 : }
1130 :
1131 : /* validate and canonicalize locale for the provider */
1132 399 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1133 : {
1134 : /*
1135 : * This would happen if template0 uses the libc provider but the new
1136 : * database uses builtin.
1137 : */
1138 29 : if (!dblocale)
1139 1 : ereport(ERROR,
1140 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1141 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1142 :
1143 28 : dblocale = builtin_validate_locale(encoding, dblocale);
1144 : }
1145 370 : else if (dblocprovider == COLLPROVIDER_ICU)
1146 : {
1147 14 : if (!(is_encoding_supported_by_icu(encoding)))
1148 1 : ereport(ERROR,
1149 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1150 : errmsg("encoding \"%s\" is not supported with ICU provider",
1151 : pg_encoding_to_char(encoding))));
1152 :
1153 : /*
1154 : * This would happen if template0 uses the libc provider but the new
1155 : * database uses icu.
1156 : */
1157 13 : if (!dblocale)
1158 1 : ereport(ERROR,
1159 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1160 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1161 :
1162 : /*
1163 : * During binary upgrade, or when the locale came from the template
1164 : * database, preserve locale string. Otherwise, canonicalize to a
1165 : * language tag.
1166 : */
1167 12 : if (!IsBinaryUpgrade && dblocale != src_locale)
1168 : {
1169 6 : char *langtag = icu_language_tag(dblocale,
1170 : icu_validation_level);
1171 :
1172 6 : if (langtag && strcmp(dblocale, langtag) != 0)
1173 : {
1174 2 : ereport(NOTICE,
1175 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1176 : langtag, dblocale)));
1177 :
1178 2 : dblocale = langtag;
1179 : }
1180 : }
1181 :
1182 12 : icu_validate_locale(dblocale);
1183 : }
1184 :
1185 : /* for libc, locale comes from datcollate and datctype */
1186 394 : if (dblocprovider == COLLPROVIDER_LIBC)
1187 356 : dblocale = NULL;
1188 :
1189 : /*
1190 : * Check that the new encoding and locale settings match the source
1191 : * database. We insist on this because we simply copy the source data ---
1192 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1193 : * according to the source locale would be wrong.
1194 : *
1195 : * However, we assume that template0 doesn't contain any non-ASCII data
1196 : * nor any indexes that depend on collation or ctype, so template0 can be
1197 : * used as template for creating a database with any encoding or locale.
1198 : */
1199 394 : if (strcmp(dbtemplate, "template0") != 0)
1200 : {
1201 236 : if (encoding != src_encoding)
1202 0 : ereport(ERROR,
1203 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1204 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1205 : pg_encoding_to_char(encoding),
1206 : pg_encoding_to_char(src_encoding)),
1207 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1208 :
1209 236 : if (strcmp(dbcollate, src_collate) != 0)
1210 1 : ereport(ERROR,
1211 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1212 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1213 : dbcollate, src_collate),
1214 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1215 :
1216 235 : if (strcmp(dbctype, src_ctype) != 0)
1217 0 : ereport(ERROR,
1218 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1219 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1220 : dbctype, src_ctype),
1221 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1222 :
1223 235 : if (dblocprovider != src_locprovider)
1224 0 : ereport(ERROR,
1225 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1226 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1227 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1228 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1229 :
1230 235 : if (dblocprovider == COLLPROVIDER_ICU)
1231 : {
1232 : char *val1;
1233 : char *val2;
1234 :
1235 : Assert(dblocale);
1236 : Assert(src_locale);
1237 6 : if (strcmp(dblocale, src_locale) != 0)
1238 0 : ereport(ERROR,
1239 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1240 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1241 : dblocale, src_locale),
1242 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1243 :
1244 6 : val1 = dbicurules;
1245 6 : if (!val1)
1246 6 : val1 = "";
1247 6 : val2 = src_icurules;
1248 6 : if (!val2)
1249 6 : val2 = "";
1250 6 : if (strcmp(val1, val2) != 0)
1251 0 : ereport(ERROR,
1252 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1253 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1254 : val1, val2),
1255 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1256 : }
1257 : }
1258 :
1259 : /*
1260 : * If we got a collation version for the template database, check that it
1261 : * matches the actual OS collation version. Otherwise error; the user
1262 : * needs to fix the template database first. Don't complain if a
1263 : * collation version was specified explicitly as a statement option; that
1264 : * is used by pg_upgrade to reproduce the old state exactly.
1265 : *
1266 : * (If the template database has no collation version, then either the
1267 : * platform/provider does not support collation versioning, or it's
1268 : * template0, for which we stipulate that it does not contain
1269 : * collation-using objects.)
1270 : */
1271 393 : if (src_collversion && !collversionEl)
1272 : {
1273 : char *actual_versionstr;
1274 : const char *locale;
1275 :
1276 155 : if (dblocprovider == COLLPROVIDER_LIBC)
1277 144 : locale = dbcollate;
1278 : else
1279 11 : locale = dblocale;
1280 :
1281 155 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1282 155 : if (!actual_versionstr)
1283 0 : ereport(ERROR,
1284 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1285 : dbtemplate)));
1286 :
1287 155 : if (strcmp(actual_versionstr, src_collversion) != 0)
1288 0 : ereport(ERROR,
1289 : (errmsg("template database \"%s\" has a collation version mismatch",
1290 : dbtemplate),
1291 : errdetail("The template database was created using collation version %s, "
1292 : "but the operating system provides version %s.",
1293 : src_collversion, actual_versionstr),
1294 : errhint("Rebuild all objects in the template database that use the default collation and run "
1295 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1296 : "or build PostgreSQL with the right library version.",
1297 : quote_identifier(dbtemplate))));
1298 : }
1299 :
1300 393 : if (dbcollversion == NULL)
1301 363 : dbcollversion = src_collversion;
1302 :
1303 : /*
1304 : * Normally, we copy the collation version from the template database.
1305 : * This last resort only applies if the template database does not have a
1306 : * collation version, which is normally only the case for template0.
1307 : */
1308 393 : if (dbcollversion == NULL)
1309 : {
1310 : const char *locale;
1311 :
1312 208 : if (dblocprovider == COLLPROVIDER_LIBC)
1313 188 : locale = dbcollate;
1314 : else
1315 20 : locale = dblocale;
1316 :
1317 208 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1318 : }
1319 :
1320 : /* Resolve default tablespace for new database */
1321 393 : if (tablespacenameEl && tablespacenameEl->arg)
1322 17 : {
1323 : char *tablespacename;
1324 : AclResult aclresult;
1325 :
1326 17 : tablespacename = defGetString(tablespacenameEl);
1327 17 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1328 : /* check permissions */
1329 17 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1330 : ACL_CREATE);
1331 17 : if (aclresult != ACLCHECK_OK)
1332 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1333 : tablespacename);
1334 :
1335 : /* pg_global must never be the default tablespace */
1336 17 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1337 0 : ereport(ERROR,
1338 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1339 : errmsg("pg_global cannot be used as default tablespace")));
1340 :
1341 : /*
1342 : * If we are trying to change the default tablespace of the template,
1343 : * we require that the template not have any files in the new default
1344 : * tablespace. This is necessary because otherwise the copied
1345 : * database would contain pg_class rows that refer to its default
1346 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1347 : * would cause problems. For example another CREATE DATABASE using
1348 : * the copied database as template, and trying to change its default
1349 : * tablespace again, would yield outright incorrect results (it would
1350 : * improperly move tables to the new default tablespace that should
1351 : * stay in the same tablespace).
1352 : */
1353 17 : if (dst_deftablespace != src_deftablespace)
1354 : {
1355 : char *srcpath;
1356 : struct stat st;
1357 :
1358 17 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1359 :
1360 17 : if (stat(srcpath, &st) == 0 &&
1361 0 : S_ISDIR(st.st_mode) &&
1362 0 : !directory_is_empty(srcpath))
1363 0 : ereport(ERROR,
1364 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1365 : errmsg("cannot assign new default tablespace \"%s\"",
1366 : tablespacename),
1367 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1368 : dbtemplate)));
1369 17 : pfree(srcpath);
1370 : }
1371 : }
1372 : else
1373 : {
1374 : /* Use template database's default tablespace */
1375 376 : dst_deftablespace = src_deftablespace;
1376 : /* Note there is no additional permission check in this path */
1377 : }
1378 :
1379 : /*
1380 : * If built with appropriate switch, whine when regression-testing
1381 : * conventions for database names are violated. But don't complain during
1382 : * initdb.
1383 : */
1384 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1385 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1386 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1387 : #endif
1388 :
1389 : /*
1390 : * Check for db name conflict. This is just to give a more friendly error
1391 : * message than "unique index violation". There's a race condition but
1392 : * we're willing to accept the less friendly message in that case.
1393 : */
1394 393 : if (OidIsValid(get_database_oid(dbname, true)))
1395 1 : ereport(ERROR,
1396 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1397 : errmsg("database \"%s\" already exists", dbname)));
1398 :
1399 : /*
1400 : * The source DB can't have any active backends, except this one
1401 : * (exception is to allow CREATE DB while connected to template1).
1402 : * Otherwise we might copy inconsistent data.
1403 : *
1404 : * This should be last among the basic error checks, because it involves
1405 : * potential waiting; we may as well throw an error first if we're gonna
1406 : * throw one.
1407 : */
1408 392 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1409 1 : ereport(ERROR,
1410 : (errcode(ERRCODE_OBJECT_IN_USE),
1411 : errmsg("source database \"%s\" is being accessed by other users",
1412 : dbtemplate),
1413 : errdetail_busy_db(notherbackends, npreparedxacts)));
1414 :
1415 : /*
1416 : * Select an OID for the new database, checking that it doesn't have a
1417 : * filename conflict with anything already existing in the tablespace
1418 : * directories.
1419 : */
1420 391 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1421 :
1422 : /*
1423 : * If database OID is configured, check if the OID is already in use or
1424 : * data directory already exists.
1425 : */
1426 391 : if (OidIsValid(dboid))
1427 : {
1428 133 : char *existing_dbname = get_database_name(dboid);
1429 :
1430 133 : if (existing_dbname != NULL)
1431 0 : ereport(ERROR,
1432 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1433 : errmsg("database OID %u is already in use by database \"%s\"",
1434 : dboid, existing_dbname));
1435 :
1436 133 : if (check_db_file_conflict(dboid))
1437 0 : ereport(ERROR,
1438 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1439 : errmsg("data directory with the specified OID %u already exists", dboid));
1440 : }
1441 : else
1442 : {
1443 : /* Select an OID for the new database if is not explicitly configured. */
1444 : do
1445 : {
1446 258 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1447 : Anum_pg_database_oid);
1448 258 : } while (check_db_file_conflict(dboid));
1449 : }
1450 :
1451 : /*
1452 : * Insert a new tuple into pg_database. This establishes our ownership of
1453 : * the new database name (anyone else trying to insert the same name will
1454 : * block on the unique index, and fail after we commit).
1455 : */
1456 :
1457 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1458 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1459 :
1460 : /* Form tuple */
1461 391 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1462 391 : new_record[Anum_pg_database_datname - 1] =
1463 391 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1464 391 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1465 391 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1466 391 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1467 391 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1468 391 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1469 391 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1470 391 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1471 391 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1472 391 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1473 391 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1474 391 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1475 391 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1476 391 : if (dblocale)
1477 37 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1478 : else
1479 354 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1480 391 : if (dbicurules)
1481 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1482 : else
1483 391 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1484 391 : if (dbcollversion)
1485 333 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1486 : else
1487 58 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1488 :
1489 : /*
1490 : * We deliberately set datacl to default (NULL), rather than copying it
1491 : * from the template database. Copying it would be a bad idea when the
1492 : * owner is not the same as the template's owner.
1493 : */
1494 391 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1495 :
1496 391 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1497 : new_record, new_record_nulls);
1498 :
1499 391 : CatalogTupleInsert(pg_database_rel, tuple);
1500 :
1501 : /*
1502 : * Now generate additional catalog entries associated with the new DB
1503 : */
1504 :
1505 : /* Register owner dependency */
1506 391 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1507 :
1508 : /* Create pg_shdepend entries for objects within database */
1509 391 : copyTemplateDependencies(src_dboid, dboid);
1510 :
1511 : /* Post creation hook for new database */
1512 391 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1513 :
1514 : /*
1515 : * If we're going to be reading data for the to-be-created database into
1516 : * shared_buffers, take a lock on it. Nobody should know that this
1517 : * database exists yet, but it's good to maintain the invariant that an
1518 : * AccessExclusiveLock on the database is sufficient to drop all of its
1519 : * buffers without worrying about more being read later.
1520 : *
1521 : * Note that we need to do this before entering the
1522 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1523 : * expects this lock to be held already.
1524 : */
1525 391 : if (dbstrategy == CREATEDB_WAL_LOG)
1526 253 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1527 :
1528 : /*
1529 : * Once we start copying subdirectories, we need to be able to clean 'em
1530 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1531 : * is not a 100% solution, because of the possibility of failure during
1532 : * transaction commit after we leave this routine, but it should handle
1533 : * most scenarios.)
1534 : */
1535 391 : fparms.src_dboid = src_dboid;
1536 391 : fparms.dest_dboid = dboid;
1537 391 : fparms.strategy = dbstrategy;
1538 :
1539 391 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1540 : PointerGetDatum(&fparms));
1541 : {
1542 : /*
1543 : * If the user has asked to create a database with WAL_LOG strategy
1544 : * then call CreateDatabaseUsingWalLog, which will copy the database
1545 : * at the block level and it will WAL log each copied block.
1546 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1547 : * database file by file.
1548 : */
1549 391 : if (dbstrategy == CREATEDB_WAL_LOG)
1550 253 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1551 : dst_deftablespace);
1552 : else
1553 138 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1554 : dst_deftablespace);
1555 :
1556 : /*
1557 : * Close pg_database, but keep lock till commit.
1558 : */
1559 389 : table_close(pg_database_rel, NoLock);
1560 :
1561 : /*
1562 : * Force synchronous commit, thus minimizing the window between
1563 : * creation of the database files and committal of the transaction. If
1564 : * we crash before committing, we'll have a DB that's taking up disk
1565 : * space but is not in pg_database, which is not good.
1566 : */
1567 389 : ForceSyncCommit();
1568 : }
1569 391 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1570 : PointerGetDatum(&fparms));
1571 :
1572 389 : return dboid;
1573 : }
1574 :
1575 : /*
1576 : * Check whether chosen encoding matches chosen locale settings. This
1577 : * restriction is necessary because libc's locale-specific code usually
1578 : * fails when presented with data in an encoding it's not expecting. We
1579 : * allow mismatch in four cases:
1580 : *
1581 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1582 : * which works with any encoding.
1583 : *
1584 : * 2. locale encoding = -1, which means that we couldn't determine the
1585 : * locale's encoding and have to trust the user to get it right.
1586 : *
1587 : * 3. selected encoding is UTF8 and platform is win32. This is because
1588 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1589 : * converted to UTF16 before being used.
1590 : *
1591 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1592 : * is risky but we have historically allowed it --- notably, the
1593 : * regression tests require it.
1594 : *
1595 : * Note: if you change this policy, fix initdb to match.
1596 : */
1597 : void
1598 415 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1599 : {
1600 415 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1601 415 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1602 :
1603 419 : if (!(ctype_encoding == encoding ||
1604 4 : ctype_encoding == PG_SQL_ASCII ||
1605 : ctype_encoding == -1 ||
1606 : #ifdef WIN32
1607 : encoding == PG_UTF8 ||
1608 : #endif
1609 4 : (encoding == PG_SQL_ASCII && superuser())))
1610 0 : ereport(ERROR,
1611 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1612 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1613 : pg_encoding_to_char(encoding),
1614 : ctype),
1615 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1616 : pg_encoding_to_char(ctype_encoding))));
1617 :
1618 419 : if (!(collate_encoding == encoding ||
1619 4 : collate_encoding == PG_SQL_ASCII ||
1620 : collate_encoding == -1 ||
1621 : #ifdef WIN32
1622 : encoding == PG_UTF8 ||
1623 : #endif
1624 4 : (encoding == PG_SQL_ASCII && superuser())))
1625 0 : ereport(ERROR,
1626 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1627 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1628 : pg_encoding_to_char(encoding),
1629 : collate),
1630 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1631 : pg_encoding_to_char(collate_encoding))));
1632 415 : }
1633 :
1634 : /* Error cleanup callback for createdb */
1635 : static void
1636 2 : createdb_failure_callback(int code, Datum arg)
1637 : {
1638 2 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1639 :
1640 : /*
1641 : * If we were copying database at block levels then drop pages for the
1642 : * destination database that are in the shared buffer cache. And tell
1643 : * checkpointer to forget any pending fsync and unlink requests for files
1644 : * in the database. The reasoning behind doing this is same as explained
1645 : * in dropdb function. But unlike dropdb we don't need to call
1646 : * pgstat_drop_database because this database is still not created so
1647 : * there should not be any stat for this.
1648 : */
1649 2 : if (fparms->strategy == CREATEDB_WAL_LOG)
1650 : {
1651 2 : DropDatabaseBuffers(fparms->dest_dboid);
1652 2 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1653 :
1654 : /* Release lock on the target database. */
1655 2 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1656 : AccessShareLock);
1657 : }
1658 :
1659 : /*
1660 : * Release lock on source database before doing recursive remove. This is
1661 : * not essential but it seems desirable to release the lock as soon as
1662 : * possible.
1663 : */
1664 2 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1665 :
1666 : /* Throw away any successfully copied subdirectories */
1667 2 : remove_dbtablespaces(fparms->dest_dboid);
1668 2 : }
1669 :
1670 :
1671 : /*
1672 : * DROP DATABASE
1673 : */
1674 : void
1675 64 : dropdb(const char *dbname, bool missing_ok, bool force)
1676 : {
1677 : Oid db_id;
1678 : bool db_istemplate;
1679 : Relation pgdbrel;
1680 : HeapTuple tup;
1681 : ScanKeyData scankey;
1682 : void *inplace_state;
1683 : Form_pg_database datform;
1684 : int notherbackends;
1685 : int npreparedxacts;
1686 : int nslots,
1687 : nslots_active;
1688 : int nsubscriptions;
1689 :
1690 : /*
1691 : * Look up the target database's OID, and get exclusive lock on it. We
1692 : * need this to ensure that no new backend starts up in the target
1693 : * database while we are deleting it (see postinit.c), and that no one is
1694 : * using it as a CREATE DATABASE template or trying to delete it for
1695 : * themselves.
1696 : */
1697 64 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1698 :
1699 64 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1700 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1701 : {
1702 16 : if (!missing_ok)
1703 : {
1704 8 : ereport(ERROR,
1705 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1706 : errmsg("database \"%s\" does not exist", dbname)));
1707 : }
1708 : else
1709 : {
1710 : /* Close pg_database, release the lock, since we changed nothing */
1711 8 : table_close(pgdbrel, RowExclusiveLock);
1712 8 : ereport(NOTICE,
1713 : (errmsg("database \"%s\" does not exist, skipping",
1714 : dbname)));
1715 8 : return;
1716 : }
1717 : }
1718 :
1719 : /*
1720 : * Permission checks
1721 : */
1722 48 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1723 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1724 : dbname);
1725 :
1726 : /* DROP hook for the database being removed */
1727 48 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1728 :
1729 : /*
1730 : * Disallow dropping a DB that is marked istemplate. This is just to
1731 : * prevent people from accidentally dropping template0 or template1; they
1732 : * can do so if they're really determined ...
1733 : */
1734 48 : if (db_istemplate)
1735 0 : ereport(ERROR,
1736 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1737 : errmsg("cannot drop a template database")));
1738 :
1739 : /* Obviously can't drop my own database */
1740 48 : if (db_id == MyDatabaseId)
1741 0 : ereport(ERROR,
1742 : (errcode(ERRCODE_OBJECT_IN_USE),
1743 : errmsg("cannot drop the currently open database")));
1744 :
1745 : /*
1746 : * Check whether there are active logical slots that refer to the
1747 : * to-be-dropped database. The database lock we are holding prevents the
1748 : * creation of new slots using the database or existing slots becoming
1749 : * active.
1750 : */
1751 48 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1752 48 : if (nslots_active)
1753 : {
1754 1 : ereport(ERROR,
1755 : (errcode(ERRCODE_OBJECT_IN_USE),
1756 : errmsg("database \"%s\" is used by an active logical replication slot",
1757 : dbname),
1758 : errdetail_plural("There is %d active slot.",
1759 : "There are %d active slots.",
1760 : nslots_active, nslots_active)));
1761 : }
1762 :
1763 : /*
1764 : * Check if there are subscriptions defined in the target database.
1765 : *
1766 : * We can't drop them automatically because they might be holding
1767 : * resources in other databases/instances.
1768 : */
1769 47 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1770 0 : ereport(ERROR,
1771 : (errcode(ERRCODE_OBJECT_IN_USE),
1772 : errmsg("database \"%s\" is being used by logical replication subscription",
1773 : dbname),
1774 : errdetail_plural("There is %d subscription.",
1775 : "There are %d subscriptions.",
1776 : nsubscriptions, nsubscriptions)));
1777 :
1778 :
1779 : /*
1780 : * Attempt to terminate all existing connections to the target database if
1781 : * the user has requested to do so.
1782 : */
1783 47 : if (force)
1784 1 : TerminateOtherDBBackends(db_id);
1785 :
1786 : /*
1787 : * Check for other backends in the target database. (Because we hold the
1788 : * database lock, no new ones can start after this.)
1789 : *
1790 : * As in CREATE DATABASE, check this after other error conditions.
1791 : */
1792 47 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1793 0 : ereport(ERROR,
1794 : (errcode(ERRCODE_OBJECT_IN_USE),
1795 : errmsg("database \"%s\" is being accessed by other users",
1796 : dbname),
1797 : errdetail_busy_db(notherbackends, npreparedxacts)));
1798 :
1799 : /*
1800 : * Delete any comments or security labels associated with the database.
1801 : */
1802 47 : DeleteSharedComments(db_id, DatabaseRelationId);
1803 47 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1804 :
1805 : /*
1806 : * Remove settings associated with this database
1807 : */
1808 47 : DropSetting(db_id, InvalidOid);
1809 :
1810 : /*
1811 : * Remove shared dependency references for the database.
1812 : */
1813 47 : dropDatabaseDependencies(db_id);
1814 :
1815 : /*
1816 : * Tell the cumulative stats system to forget it immediately, too.
1817 : */
1818 47 : pgstat_drop_database(db_id);
1819 :
1820 : /*
1821 : * Except for the deletion of the catalog row, subsequent actions are not
1822 : * transactional (consider DropDatabaseBuffers() discarding modified
1823 : * buffers). But we might crash or get interrupted below. To prevent
1824 : * accesses to a database with invalid contents, mark the database as
1825 : * invalid using an in-place update.
1826 : *
1827 : * We need to flush the WAL before continuing, to guarantee the
1828 : * modification is durable before performing irreversible filesystem
1829 : * operations.
1830 : */
1831 47 : ScanKeyInit(&scankey,
1832 : Anum_pg_database_datname,
1833 : BTEqualStrategyNumber, F_NAMEEQ,
1834 : CStringGetDatum(dbname));
1835 47 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1836 : NULL, 1, &scankey, &tup, &inplace_state);
1837 47 : if (!HeapTupleIsValid(tup))
1838 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1839 47 : datform = (Form_pg_database) GETSTRUCT(tup);
1840 47 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1841 47 : systable_inplace_update_finish(inplace_state, tup);
1842 47 : XLogFlush(XactLastRecEnd);
1843 :
1844 : /*
1845 : * Also delete the tuple - transactionally. If this transaction commits,
1846 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1847 : */
1848 47 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1849 47 : heap_freetuple(tup);
1850 :
1851 : /*
1852 : * Drop db-specific replication slots.
1853 : */
1854 47 : ReplicationSlotsDropDBSlots(db_id);
1855 :
1856 : /*
1857 : * Drop pages for this database that are in the shared buffer cache. This
1858 : * is important to ensure that no remaining backend tries to write out a
1859 : * dirty buffer to the dead database later...
1860 : */
1861 47 : DropDatabaseBuffers(db_id);
1862 :
1863 : /*
1864 : * Tell checkpointer to forget any pending fsync and unlink requests for
1865 : * files in the database; else the fsyncs will fail at next checkpoint, or
1866 : * worse, it will delete files that belong to a newly created database
1867 : * with the same OID.
1868 : */
1869 47 : ForgetDatabaseSyncRequests(db_id);
1870 :
1871 : /*
1872 : * Force a checkpoint to make sure the checkpointer has received the
1873 : * message sent by ForgetDatabaseSyncRequests.
1874 : */
1875 47 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1876 :
1877 : /* Close all smgr fds in all backends. */
1878 47 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1879 :
1880 : /*
1881 : * Remove all tablespace subdirs belonging to the database.
1882 : */
1883 47 : remove_dbtablespaces(db_id);
1884 :
1885 : /*
1886 : * Close pg_database, but keep lock till commit.
1887 : */
1888 47 : table_close(pgdbrel, NoLock);
1889 :
1890 : /*
1891 : * Force synchronous commit, thus minimizing the window between removal of
1892 : * the database files and committal of the transaction. If we crash before
1893 : * committing, we'll have a DB that's gone on disk but still there
1894 : * according to pg_database, which is not good.
1895 : */
1896 47 : ForceSyncCommit();
1897 : }
1898 :
1899 :
1900 : /*
1901 : * Rename database
1902 : */
1903 : ObjectAddress
1904 7 : RenameDatabase(const char *oldname, const char *newname)
1905 : {
1906 : Oid db_id;
1907 : HeapTuple newtup;
1908 : ItemPointerData otid;
1909 : Relation rel;
1910 : int notherbackends;
1911 : int npreparedxacts;
1912 : ObjectAddress address;
1913 :
1914 : /*
1915 : * Look up the target database's OID, and get exclusive lock on it. We
1916 : * need this for the same reasons as DROP DATABASE.
1917 : */
1918 7 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1919 :
1920 7 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1921 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1922 0 : ereport(ERROR,
1923 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1924 : errmsg("database \"%s\" does not exist", oldname)));
1925 :
1926 : /* must be owner */
1927 7 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1928 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1929 : oldname);
1930 :
1931 : /* must have createdb rights */
1932 7 : if (!have_createdb_privilege())
1933 0 : ereport(ERROR,
1934 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1935 : errmsg("permission denied to rename database")));
1936 :
1937 : /*
1938 : * If built with appropriate switch, whine when regression-testing
1939 : * conventions for database names are violated.
1940 : */
1941 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1942 : if (strstr(newname, "regression") == NULL)
1943 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1944 : #endif
1945 :
1946 : /*
1947 : * Make sure the new name doesn't exist. See notes for same error in
1948 : * CREATE DATABASE.
1949 : */
1950 7 : if (OidIsValid(get_database_oid(newname, true)))
1951 0 : ereport(ERROR,
1952 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1953 : errmsg("database \"%s\" already exists", newname)));
1954 :
1955 : /*
1956 : * XXX Client applications probably store the current database somewhere,
1957 : * so renaming it could cause confusion. On the other hand, there may not
1958 : * be an actual problem besides a little confusion, so think about this
1959 : * and decide.
1960 : */
1961 7 : if (db_id == MyDatabaseId)
1962 0 : ereport(ERROR,
1963 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1964 : errmsg("current database cannot be renamed")));
1965 :
1966 : /*
1967 : * Make sure the database does not have active sessions. This is the same
1968 : * concern as above, but applied to other sessions.
1969 : *
1970 : * As in CREATE DATABASE, check this after other error conditions.
1971 : */
1972 7 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1973 0 : ereport(ERROR,
1974 : (errcode(ERRCODE_OBJECT_IN_USE),
1975 : errmsg("database \"%s\" is being accessed by other users",
1976 : oldname),
1977 : errdetail_busy_db(notherbackends, npreparedxacts)));
1978 :
1979 : /* rename */
1980 7 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1981 7 : if (!HeapTupleIsValid(newtup))
1982 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1983 7 : otid = newtup->t_self;
1984 7 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1985 7 : CatalogTupleUpdate(rel, &otid, newtup);
1986 7 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1987 :
1988 7 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1989 :
1990 7 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1991 :
1992 : /*
1993 : * Close pg_database, but keep lock till commit.
1994 : */
1995 7 : table_close(rel, NoLock);
1996 :
1997 7 : return address;
1998 : }
1999 :
2000 :
2001 : /*
2002 : * ALTER DATABASE SET TABLESPACE
2003 : */
2004 : static void
2005 12 : movedb(const char *dbname, const char *tblspcname)
2006 : {
2007 : Oid db_id;
2008 : Relation pgdbrel;
2009 : int notherbackends;
2010 : int npreparedxacts;
2011 : HeapTuple oldtuple,
2012 : newtuple;
2013 : Oid src_tblspcoid,
2014 : dst_tblspcoid;
2015 : ScanKeyData scankey;
2016 : SysScanDesc sysscan;
2017 : AclResult aclresult;
2018 : char *src_dbpath;
2019 : char *dst_dbpath;
2020 : DIR *dstdir;
2021 : struct dirent *xlde;
2022 : movedb_failure_params fparms;
2023 :
2024 : /*
2025 : * Look up the target database's OID, and get exclusive lock on it. We
2026 : * need this to ensure that no new backend starts up in the database while
2027 : * we are moving it, and that no one is using it as a CREATE DATABASE
2028 : * template or trying to delete it.
2029 : */
2030 12 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2031 :
2032 12 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2033 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2034 0 : ereport(ERROR,
2035 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2036 : errmsg("database \"%s\" does not exist", dbname)));
2037 :
2038 : /*
2039 : * We actually need a session lock, so that the lock will persist across
2040 : * the commit/restart below. (We could almost get away with letting the
2041 : * lock be released at commit, except that someone could try to move
2042 : * relations of the DB back into the old directory while we rmtree() it.)
2043 : */
2044 12 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2045 : AccessExclusiveLock);
2046 :
2047 : /*
2048 : * Permission checks
2049 : */
2050 12 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2051 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2052 : dbname);
2053 :
2054 : /*
2055 : * Obviously can't move the tables of my own database
2056 : */
2057 12 : if (db_id == MyDatabaseId)
2058 0 : ereport(ERROR,
2059 : (errcode(ERRCODE_OBJECT_IN_USE),
2060 : errmsg("cannot change the tablespace of the currently open database")));
2061 :
2062 : /*
2063 : * Get tablespace's oid
2064 : */
2065 12 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2066 :
2067 : /*
2068 : * Permission checks
2069 : */
2070 12 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2071 : ACL_CREATE);
2072 12 : if (aclresult != ACLCHECK_OK)
2073 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2074 : tblspcname);
2075 :
2076 : /*
2077 : * pg_global must never be the default tablespace
2078 : */
2079 12 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2080 0 : ereport(ERROR,
2081 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2082 : errmsg("pg_global cannot be used as default tablespace")));
2083 :
2084 : /*
2085 : * No-op if same tablespace
2086 : */
2087 12 : if (src_tblspcoid == dst_tblspcoid)
2088 : {
2089 0 : table_close(pgdbrel, NoLock);
2090 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2091 : AccessExclusiveLock);
2092 0 : return;
2093 : }
2094 :
2095 : /*
2096 : * Check for other backends in the target database. (Because we hold the
2097 : * database lock, no new ones can start after this.)
2098 : *
2099 : * As in CREATE DATABASE, check this after other error conditions.
2100 : */
2101 12 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2102 0 : ereport(ERROR,
2103 : (errcode(ERRCODE_OBJECT_IN_USE),
2104 : errmsg("database \"%s\" is being accessed by other users",
2105 : dbname),
2106 : errdetail_busy_db(notherbackends, npreparedxacts)));
2107 :
2108 : /*
2109 : * Get old and new database paths
2110 : */
2111 12 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2112 12 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2113 :
2114 : /*
2115 : * Force a checkpoint before proceeding. This will force all dirty
2116 : * buffers, including those of unlogged tables, out to disk, to ensure
2117 : * source database is up-to-date on disk for the copy.
2118 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2119 : * process any pending unlink requests. Otherwise, the check for existing
2120 : * files in the target directory might fail unnecessarily, not to mention
2121 : * that the copy might fail due to source files getting deleted under it.
2122 : * On Windows, this also ensures that background procs don't hold any open
2123 : * files, which would cause rmdir() to fail.
2124 : */
2125 12 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2126 : | CHECKPOINT_FLUSH_UNLOGGED);
2127 :
2128 : /* Close all smgr fds in all backends. */
2129 12 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2130 :
2131 : /*
2132 : * Now drop all buffers holding data of the target database; they should
2133 : * no longer be dirty so DropDatabaseBuffers is safe.
2134 : *
2135 : * It might seem that we could just let these buffers age out of shared
2136 : * buffers naturally, since they should not get referenced anymore. The
2137 : * problem with that is that if the user later moves the database back to
2138 : * its original tablespace, any still-surviving buffers would appear to
2139 : * contain valid data again --- but they'd be missing any changes made in
2140 : * the database while it was in the new tablespace. In any case, freeing
2141 : * buffers that should never be used again seems worth the cycles.
2142 : *
2143 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2144 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2145 : */
2146 12 : DropDatabaseBuffers(db_id);
2147 :
2148 : /*
2149 : * Check for existence of files in the target directory, i.e., objects of
2150 : * this database that are already in the target tablespace. We can't
2151 : * allow the move in such a case, because we would need to change those
2152 : * relations' pg_class.reltablespace entries to zero, and we don't have
2153 : * access to the DB's pg_class to do so.
2154 : */
2155 12 : dstdir = AllocateDir(dst_dbpath);
2156 12 : if (dstdir != NULL)
2157 : {
2158 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2159 : {
2160 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2161 0 : strcmp(xlde->d_name, "..") == 0)
2162 0 : continue;
2163 :
2164 0 : ereport(ERROR,
2165 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2166 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2167 : dbname, tblspcname),
2168 : errhint("You must move them back to the database's default tablespace before using this command.")));
2169 : }
2170 :
2171 0 : FreeDir(dstdir);
2172 :
2173 : /*
2174 : * The directory exists but is empty. We must remove it before using
2175 : * the copydir function.
2176 : */
2177 0 : if (rmdir(dst_dbpath) != 0)
2178 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2179 : dst_dbpath);
2180 : }
2181 :
2182 : /*
2183 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2184 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2185 : * of the possibility of failure during transaction commit, but it should
2186 : * handle most scenarios.
2187 : */
2188 12 : fparms.dest_dboid = db_id;
2189 12 : fparms.dest_tsoid = dst_tblspcoid;
2190 12 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2191 : PointerGetDatum(&fparms));
2192 : {
2193 12 : Datum new_record[Natts_pg_database] = {0};
2194 12 : bool new_record_nulls[Natts_pg_database] = {0};
2195 12 : bool new_record_repl[Natts_pg_database] = {0};
2196 :
2197 : /*
2198 : * Copy files from the old tablespace to the new one
2199 : */
2200 12 : copydir(src_dbpath, dst_dbpath, false);
2201 :
2202 : /*
2203 : * Record the filesystem change in XLOG
2204 : */
2205 : {
2206 : xl_dbase_create_file_copy_rec xlrec;
2207 :
2208 12 : xlrec.db_id = db_id;
2209 12 : xlrec.tablespace_id = dst_tblspcoid;
2210 12 : xlrec.src_db_id = db_id;
2211 12 : xlrec.src_tablespace_id = src_tblspcoid;
2212 :
2213 12 : XLogBeginInsert();
2214 12 : XLogRegisterData(&xlrec,
2215 : sizeof(xl_dbase_create_file_copy_rec));
2216 :
2217 12 : (void) XLogInsert(RM_DBASE_ID,
2218 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2219 : }
2220 :
2221 : /*
2222 : * Update the database's pg_database tuple
2223 : */
2224 12 : ScanKeyInit(&scankey,
2225 : Anum_pg_database_datname,
2226 : BTEqualStrategyNumber, F_NAMEEQ,
2227 : CStringGetDatum(dbname));
2228 12 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2229 : NULL, 1, &scankey);
2230 12 : oldtuple = systable_getnext(sysscan);
2231 12 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2232 0 : ereport(ERROR,
2233 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2234 : errmsg("database \"%s\" does not exist", dbname)));
2235 12 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2236 :
2237 12 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2238 12 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2239 :
2240 12 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2241 : new_record,
2242 : new_record_nulls, new_record_repl);
2243 12 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2244 12 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2245 :
2246 12 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2247 :
2248 12 : systable_endscan(sysscan);
2249 :
2250 : /*
2251 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2252 : * ensure that we don't have to replay a committed
2253 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2254 : * any unlogged operations done in the new DB tablespace before the
2255 : * next checkpoint.
2256 : */
2257 12 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2258 :
2259 : /*
2260 : * Force synchronous commit, thus minimizing the window between
2261 : * copying the database files and committal of the transaction. If we
2262 : * crash before committing, we'll leave an orphaned set of files on
2263 : * disk, which is not fatal but not good either.
2264 : */
2265 12 : ForceSyncCommit();
2266 :
2267 : /*
2268 : * Close pg_database, but keep lock till commit.
2269 : */
2270 12 : table_close(pgdbrel, NoLock);
2271 : }
2272 12 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2273 : PointerGetDatum(&fparms));
2274 :
2275 : /*
2276 : * Commit the transaction so that the pg_database update is committed. If
2277 : * we crash while removing files, the database won't be corrupt, we'll
2278 : * just leave some orphaned files in the old directory.
2279 : *
2280 : * (This is OK because we know we aren't inside a transaction block.)
2281 : *
2282 : * XXX would it be safe/better to do this inside the ensure block? Not
2283 : * convinced it's a good idea; consider elog just after the transaction
2284 : * really commits.
2285 : */
2286 12 : PopActiveSnapshot();
2287 12 : CommitTransactionCommand();
2288 :
2289 : /* Start new transaction for the remaining work; don't need a snapshot */
2290 12 : StartTransactionCommand();
2291 :
2292 : /*
2293 : * Remove files from the old tablespace
2294 : */
2295 12 : if (!rmtree(src_dbpath, true))
2296 0 : ereport(WARNING,
2297 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2298 : src_dbpath)));
2299 :
2300 : /*
2301 : * Record the filesystem change in XLOG
2302 : */
2303 : {
2304 : xl_dbase_drop_rec xlrec;
2305 :
2306 12 : xlrec.db_id = db_id;
2307 12 : xlrec.ntablespaces = 1;
2308 :
2309 12 : XLogBeginInsert();
2310 12 : XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2311 12 : XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2312 :
2313 12 : (void) XLogInsert(RM_DBASE_ID,
2314 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2315 : }
2316 :
2317 : /* Now it's safe to release the database lock */
2318 12 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2319 : AccessExclusiveLock);
2320 :
2321 12 : pfree(src_dbpath);
2322 12 : pfree(dst_dbpath);
2323 : }
2324 :
2325 : /* Error cleanup callback for movedb */
2326 : static void
2327 0 : movedb_failure_callback(int code, Datum arg)
2328 : {
2329 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2330 : char *dstpath;
2331 :
2332 : /* Get rid of anything we managed to copy to the target directory */
2333 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2334 :
2335 0 : (void) rmtree(dstpath, true);
2336 :
2337 0 : pfree(dstpath);
2338 0 : }
2339 :
2340 : /*
2341 : * Process options and call dropdb function.
2342 : */
2343 : void
2344 64 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2345 : {
2346 64 : bool force = false;
2347 : ListCell *lc;
2348 :
2349 77 : foreach(lc, stmt->options)
2350 : {
2351 13 : DefElem *opt = (DefElem *) lfirst(lc);
2352 :
2353 13 : if (strcmp(opt->defname, "force") == 0)
2354 13 : force = true;
2355 : else
2356 0 : ereport(ERROR,
2357 : (errcode(ERRCODE_SYNTAX_ERROR),
2358 : errmsg("unrecognized %s option \"%s\"",
2359 : "DROP DATABASE", opt->defname),
2360 : parser_errposition(pstate, opt->location)));
2361 : }
2362 :
2363 64 : dropdb(stmt->dbname, stmt->missing_ok, force);
2364 55 : }
2365 :
2366 : /*
2367 : * ALTER DATABASE name ...
2368 : */
2369 : Oid
2370 46 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2371 : {
2372 : Relation rel;
2373 : Oid dboid;
2374 : HeapTuple tuple,
2375 : newtuple;
2376 : Form_pg_database datform;
2377 : ScanKeyData scankey;
2378 : SysScanDesc scan;
2379 : ListCell *option;
2380 46 : bool dbistemplate = false;
2381 46 : bool dballowconnections = true;
2382 46 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2383 46 : DefElem *distemplate = NULL;
2384 46 : DefElem *dallowconnections = NULL;
2385 46 : DefElem *dconnlimit = NULL;
2386 46 : DefElem *dtablespace = NULL;
2387 46 : Datum new_record[Natts_pg_database] = {0};
2388 46 : bool new_record_nulls[Natts_pg_database] = {0};
2389 46 : bool new_record_repl[Natts_pg_database] = {0};
2390 :
2391 : /* Extract options from the statement node tree */
2392 92 : foreach(option, stmt->options)
2393 : {
2394 46 : DefElem *defel = (DefElem *) lfirst(option);
2395 :
2396 46 : if (strcmp(defel->defname, "is_template") == 0)
2397 : {
2398 11 : if (distemplate)
2399 0 : errorConflictingDefElem(defel, pstate);
2400 11 : distemplate = defel;
2401 : }
2402 35 : else if (strcmp(defel->defname, "allow_connections") == 0)
2403 : {
2404 19 : if (dallowconnections)
2405 0 : errorConflictingDefElem(defel, pstate);
2406 19 : dallowconnections = defel;
2407 : }
2408 16 : else if (strcmp(defel->defname, "connection_limit") == 0)
2409 : {
2410 4 : if (dconnlimit)
2411 0 : errorConflictingDefElem(defel, pstate);
2412 4 : dconnlimit = defel;
2413 : }
2414 12 : else if (strcmp(defel->defname, "tablespace") == 0)
2415 : {
2416 12 : if (dtablespace)
2417 0 : errorConflictingDefElem(defel, pstate);
2418 12 : dtablespace = defel;
2419 : }
2420 : else
2421 0 : ereport(ERROR,
2422 : (errcode(ERRCODE_SYNTAX_ERROR),
2423 : errmsg("option \"%s\" not recognized", defel->defname),
2424 : parser_errposition(pstate, defel->location)));
2425 : }
2426 :
2427 46 : if (dtablespace)
2428 : {
2429 : /*
2430 : * While the SET TABLESPACE syntax doesn't allow any other options,
2431 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2432 : * options from being specified in that case.
2433 : */
2434 12 : if (list_length(stmt->options) != 1)
2435 0 : ereport(ERROR,
2436 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2437 : errmsg("option \"%s\" cannot be specified with other options",
2438 : dtablespace->defname),
2439 : parser_errposition(pstate, dtablespace->location)));
2440 : /* this case isn't allowed within a transaction block */
2441 12 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2442 12 : movedb(stmt->dbname, defGetString(dtablespace));
2443 12 : return InvalidOid;
2444 : }
2445 :
2446 34 : if (distemplate && distemplate->arg)
2447 11 : dbistemplate = defGetBoolean(distemplate);
2448 34 : if (dallowconnections && dallowconnections->arg)
2449 19 : dballowconnections = defGetBoolean(dallowconnections);
2450 34 : if (dconnlimit && dconnlimit->arg)
2451 : {
2452 4 : dbconnlimit = defGetInt32(dconnlimit);
2453 4 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2454 0 : ereport(ERROR,
2455 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2456 : errmsg("invalid connection limit: %d", dbconnlimit)));
2457 : }
2458 :
2459 : /*
2460 : * Get the old tuple. We don't need a lock on the database per se,
2461 : * because we're not going to do anything that would mess up incoming
2462 : * connections.
2463 : */
2464 34 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2465 34 : ScanKeyInit(&scankey,
2466 : Anum_pg_database_datname,
2467 : BTEqualStrategyNumber, F_NAMEEQ,
2468 34 : CStringGetDatum(stmt->dbname));
2469 34 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2470 : NULL, 1, &scankey);
2471 34 : tuple = systable_getnext(scan);
2472 34 : if (!HeapTupleIsValid(tuple))
2473 0 : ereport(ERROR,
2474 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2475 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2476 34 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2477 :
2478 34 : datform = (Form_pg_database) GETSTRUCT(tuple);
2479 34 : dboid = datform->oid;
2480 :
2481 34 : if (database_is_invalid_form(datform))
2482 : {
2483 1 : ereport(FATAL,
2484 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2485 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2486 : errhint("Use DROP DATABASE to drop invalid databases."));
2487 : }
2488 :
2489 33 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2490 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2491 0 : stmt->dbname);
2492 :
2493 : /*
2494 : * In order to avoid getting locked out and having to go through
2495 : * standalone mode, we refuse to disallow connections to the database
2496 : * we're currently connected to. Lockout can still happen with concurrent
2497 : * sessions but the likeliness of that is not high enough to worry about.
2498 : */
2499 33 : if (!dballowconnections && dboid == MyDatabaseId)
2500 0 : ereport(ERROR,
2501 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2502 : errmsg("cannot disallow connections for current database")));
2503 :
2504 : /*
2505 : * Build an updated tuple, perusing the information just obtained
2506 : */
2507 33 : if (distemplate)
2508 : {
2509 11 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2510 11 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2511 : }
2512 33 : if (dallowconnections)
2513 : {
2514 19 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2515 19 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2516 : }
2517 33 : if (dconnlimit)
2518 : {
2519 3 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2520 3 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2521 : }
2522 :
2523 33 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2524 : new_record_nulls, new_record_repl);
2525 33 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2526 33 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2527 :
2528 33 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2529 :
2530 33 : systable_endscan(scan);
2531 :
2532 : /* Close pg_database, but keep lock till commit */
2533 33 : table_close(rel, NoLock);
2534 :
2535 33 : return dboid;
2536 : }
2537 :
2538 :
2539 : /*
2540 : * ALTER DATABASE name REFRESH COLLATION VERSION
2541 : */
2542 : ObjectAddress
2543 3 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2544 : {
2545 : Relation rel;
2546 : ScanKeyData scankey;
2547 : SysScanDesc scan;
2548 : Oid db_id;
2549 : HeapTuple tuple;
2550 : Form_pg_database datForm;
2551 : ObjectAddress address;
2552 : Datum datum;
2553 : bool isnull;
2554 : char *oldversion;
2555 : char *newversion;
2556 :
2557 3 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2558 3 : ScanKeyInit(&scankey,
2559 : Anum_pg_database_datname,
2560 : BTEqualStrategyNumber, F_NAMEEQ,
2561 3 : CStringGetDatum(stmt->dbname));
2562 3 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2563 : NULL, 1, &scankey);
2564 3 : tuple = systable_getnext(scan);
2565 3 : if (!HeapTupleIsValid(tuple))
2566 0 : ereport(ERROR,
2567 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2568 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2569 :
2570 3 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2571 3 : db_id = datForm->oid;
2572 :
2573 3 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2574 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2575 0 : stmt->dbname);
2576 3 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2577 :
2578 3 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2579 3 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2580 :
2581 3 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2582 : {
2583 2 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2584 2 : if (isnull)
2585 0 : elog(ERROR, "unexpected null in pg_database");
2586 : }
2587 : else
2588 : {
2589 1 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2590 1 : if (isnull)
2591 0 : elog(ERROR, "unexpected null in pg_database");
2592 : }
2593 :
2594 3 : newversion = get_collation_actual_version(datForm->datlocprovider,
2595 3 : TextDatumGetCString(datum));
2596 :
2597 : /* cannot change from NULL to non-NULL or vice versa */
2598 3 : if ((!oldversion && newversion) || (oldversion && !newversion))
2599 0 : elog(ERROR, "invalid collation version change");
2600 3 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2601 0 : {
2602 0 : bool nulls[Natts_pg_database] = {0};
2603 0 : bool replaces[Natts_pg_database] = {0};
2604 0 : Datum values[Natts_pg_database] = {0};
2605 : HeapTuple newtuple;
2606 :
2607 0 : ereport(NOTICE,
2608 : (errmsg("changing version from %s to %s",
2609 : oldversion, newversion)));
2610 :
2611 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2612 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2613 :
2614 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2615 : values, nulls, replaces);
2616 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2617 0 : heap_freetuple(newtuple);
2618 : }
2619 : else
2620 3 : ereport(NOTICE,
2621 : (errmsg("version has not changed")));
2622 3 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2623 :
2624 3 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2625 :
2626 3 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2627 :
2628 3 : systable_endscan(scan);
2629 :
2630 3 : table_close(rel, NoLock);
2631 :
2632 3 : return address;
2633 : }
2634 :
2635 :
2636 : /*
2637 : * ALTER DATABASE name SET ...
2638 : */
2639 : Oid
2640 633 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2641 : {
2642 633 : Oid datid = get_database_oid(stmt->dbname, false);
2643 :
2644 : /*
2645 : * Obtain a lock on the database and make sure it didn't go away in the
2646 : * meantime.
2647 : */
2648 633 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2649 :
2650 633 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2651 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2652 0 : stmt->dbname);
2653 :
2654 633 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2655 :
2656 631 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2657 :
2658 631 : return datid;
2659 : }
2660 :
2661 :
2662 : /*
2663 : * ALTER DATABASE name OWNER TO newowner
2664 : */
2665 : ObjectAddress
2666 45 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2667 : {
2668 : Oid db_id;
2669 : HeapTuple tuple;
2670 : Relation rel;
2671 : ScanKeyData scankey;
2672 : SysScanDesc scan;
2673 : Form_pg_database datForm;
2674 : ObjectAddress address;
2675 :
2676 : /*
2677 : * Get the old tuple. We don't need a lock on the database per se,
2678 : * because we're not going to do anything that would mess up incoming
2679 : * connections.
2680 : */
2681 45 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2682 45 : ScanKeyInit(&scankey,
2683 : Anum_pg_database_datname,
2684 : BTEqualStrategyNumber, F_NAMEEQ,
2685 : CStringGetDatum(dbname));
2686 45 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2687 : NULL, 1, &scankey);
2688 45 : tuple = systable_getnext(scan);
2689 45 : if (!HeapTupleIsValid(tuple))
2690 0 : ereport(ERROR,
2691 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2692 : errmsg("database \"%s\" does not exist", dbname)));
2693 :
2694 45 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2695 45 : db_id = datForm->oid;
2696 :
2697 : /*
2698 : * If the new owner is the same as the existing owner, consider the
2699 : * command to have succeeded. This is to be consistent with other
2700 : * objects.
2701 : */
2702 45 : if (datForm->datdba != newOwnerId)
2703 : {
2704 : Datum repl_val[Natts_pg_database];
2705 15 : bool repl_null[Natts_pg_database] = {0};
2706 15 : bool repl_repl[Natts_pg_database] = {0};
2707 : Acl *newAcl;
2708 : Datum aclDatum;
2709 : bool isNull;
2710 : HeapTuple newtuple;
2711 :
2712 : /* Otherwise, must be owner of the existing object */
2713 15 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2714 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2715 : dbname);
2716 :
2717 : /* Must be able to become new owner */
2718 15 : check_can_set_role(GetUserId(), newOwnerId);
2719 :
2720 : /*
2721 : * must have createdb rights
2722 : *
2723 : * NOTE: This is different from other alter-owner checks in that the
2724 : * current user is checked for createdb privileges instead of the
2725 : * destination owner. This is consistent with the CREATE case for
2726 : * databases. Because superusers will always have this right, we need
2727 : * no special case for them.
2728 : */
2729 15 : if (!have_createdb_privilege())
2730 0 : ereport(ERROR,
2731 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2732 : errmsg("permission denied to change owner of database")));
2733 :
2734 15 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2735 :
2736 15 : repl_repl[Anum_pg_database_datdba - 1] = true;
2737 15 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2738 :
2739 : /*
2740 : * Determine the modified ACL for the new owner. This is only
2741 : * necessary when the ACL is non-null.
2742 : */
2743 15 : aclDatum = heap_getattr(tuple,
2744 : Anum_pg_database_datacl,
2745 : RelationGetDescr(rel),
2746 : &isNull);
2747 15 : if (!isNull)
2748 : {
2749 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2750 : datForm->datdba, newOwnerId);
2751 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2752 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2753 : }
2754 :
2755 15 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2756 15 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2757 15 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2758 :
2759 15 : heap_freetuple(newtuple);
2760 :
2761 : /* Update owner dependency reference */
2762 15 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2763 : }
2764 :
2765 45 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2766 :
2767 45 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2768 :
2769 45 : systable_endscan(scan);
2770 :
2771 : /* Close pg_database, but keep lock till commit */
2772 45 : table_close(rel, NoLock);
2773 :
2774 45 : return address;
2775 : }
2776 :
2777 :
2778 : Datum
2779 49 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2780 : {
2781 49 : Oid dbid = PG_GETARG_OID(0);
2782 : HeapTuple tp;
2783 : char datlocprovider;
2784 : Datum datum;
2785 : char *version;
2786 :
2787 49 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2788 49 : if (!HeapTupleIsValid(tp))
2789 0 : ereport(ERROR,
2790 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2791 : errmsg("database with OID %u does not exist", dbid)));
2792 :
2793 49 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2794 :
2795 49 : if (datlocprovider == COLLPROVIDER_LIBC)
2796 42 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2797 : else
2798 7 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2799 :
2800 49 : version = get_collation_actual_version(datlocprovider,
2801 49 : TextDatumGetCString(datum));
2802 :
2803 49 : ReleaseSysCache(tp);
2804 :
2805 49 : if (version)
2806 35 : PG_RETURN_TEXT_P(cstring_to_text(version));
2807 : else
2808 14 : PG_RETURN_NULL();
2809 : }
2810 :
2811 :
2812 : /*
2813 : * Helper functions
2814 : */
2815 :
2816 : /*
2817 : * Look up info about the database named "name". If the database exists,
2818 : * obtain the specified lock type on it, fill in any of the remaining
2819 : * parameters that aren't NULL, and return true. If no such database,
2820 : * return false.
2821 : */
2822 : static bool
2823 488 : get_db_info(const char *name, LOCKMODE lockmode,
2824 : Oid *dbIdP, Oid *ownerIdP,
2825 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2826 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2827 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2828 : char **dbIcurules,
2829 : char *dbLocProvider,
2830 : char **dbCollversion)
2831 : {
2832 488 : bool result = false;
2833 : Relation relation;
2834 :
2835 : Assert(name);
2836 :
2837 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2838 488 : relation = table_open(DatabaseRelationId, AccessShareLock);
2839 :
2840 : /*
2841 : * Loop covers the rare case where the database is renamed before we can
2842 : * lock it. We try again just in case we can find a new one of the same
2843 : * name.
2844 : */
2845 : for (;;)
2846 0 : {
2847 : ScanKeyData scanKey;
2848 : SysScanDesc scan;
2849 : HeapTuple tuple;
2850 : Oid dbOid;
2851 :
2852 : /*
2853 : * there's no syscache for database-indexed-by-name, so must do it the
2854 : * hard way
2855 : */
2856 488 : ScanKeyInit(&scanKey,
2857 : Anum_pg_database_datname,
2858 : BTEqualStrategyNumber, F_NAMEEQ,
2859 : CStringGetDatum(name));
2860 :
2861 488 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2862 : NULL, 1, &scanKey);
2863 :
2864 488 : tuple = systable_getnext(scan);
2865 :
2866 488 : if (!HeapTupleIsValid(tuple))
2867 : {
2868 : /* definitely no database of that name */
2869 16 : systable_endscan(scan);
2870 16 : break;
2871 : }
2872 :
2873 472 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2874 :
2875 472 : systable_endscan(scan);
2876 :
2877 : /*
2878 : * Now that we have a database OID, we can try to lock the DB.
2879 : */
2880 472 : if (lockmode != NoLock)
2881 472 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2882 :
2883 : /*
2884 : * And now, re-fetch the tuple by OID. If it's still there and still
2885 : * the same name, we win; else, drop the lock and loop back to try
2886 : * again.
2887 : */
2888 472 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2889 472 : if (HeapTupleIsValid(tuple))
2890 : {
2891 472 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2892 :
2893 472 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2894 : {
2895 : Datum datum;
2896 : bool isnull;
2897 :
2898 : /* oid of the database */
2899 472 : if (dbIdP)
2900 472 : *dbIdP = dbOid;
2901 : /* oid of the owner */
2902 472 : if (ownerIdP)
2903 405 : *ownerIdP = dbform->datdba;
2904 : /* character encoding */
2905 472 : if (encodingP)
2906 405 : *encodingP = dbform->encoding;
2907 : /* allowed as template? */
2908 472 : if (dbIsTemplateP)
2909 453 : *dbIsTemplateP = dbform->datistemplate;
2910 : /* Has on login event trigger? */
2911 472 : if (dbHasLoginEvtP)
2912 405 : *dbHasLoginEvtP = dbform->dathasloginevt;
2913 : /* allowing connections? */
2914 472 : if (dbAllowConnP)
2915 405 : *dbAllowConnP = dbform->datallowconn;
2916 : /* limit of frozen XIDs */
2917 472 : if (dbFrozenXidP)
2918 405 : *dbFrozenXidP = dbform->datfrozenxid;
2919 : /* minimum MultiXactId */
2920 472 : if (dbMinMultiP)
2921 405 : *dbMinMultiP = dbform->datminmxid;
2922 : /* default tablespace for this database */
2923 472 : if (dbTablespace)
2924 417 : *dbTablespace = dbform->dattablespace;
2925 : /* default locale settings for this database */
2926 472 : if (dbLocProvider)
2927 405 : *dbLocProvider = dbform->datlocprovider;
2928 472 : if (dbCollate)
2929 : {
2930 405 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2931 405 : *dbCollate = TextDatumGetCString(datum);
2932 : }
2933 472 : if (dbCtype)
2934 : {
2935 405 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2936 405 : *dbCtype = TextDatumGetCString(datum);
2937 : }
2938 472 : if (dbLocale)
2939 : {
2940 405 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2941 405 : if (isnull)
2942 376 : *dbLocale = NULL;
2943 : else
2944 29 : *dbLocale = TextDatumGetCString(datum);
2945 : }
2946 472 : if (dbIcurules)
2947 : {
2948 405 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2949 405 : if (isnull)
2950 405 : *dbIcurules = NULL;
2951 : else
2952 0 : *dbIcurules = TextDatumGetCString(datum);
2953 : }
2954 472 : if (dbCollversion)
2955 : {
2956 405 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2957 405 : if (isnull)
2958 245 : *dbCollversion = NULL;
2959 : else
2960 160 : *dbCollversion = TextDatumGetCString(datum);
2961 : }
2962 472 : ReleaseSysCache(tuple);
2963 472 : result = true;
2964 472 : break;
2965 : }
2966 : /* can only get here if it was just renamed */
2967 0 : ReleaseSysCache(tuple);
2968 : }
2969 :
2970 0 : if (lockmode != NoLock)
2971 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2972 : }
2973 :
2974 488 : table_close(relation, AccessShareLock);
2975 :
2976 488 : return result;
2977 : }
2978 :
2979 : /* Check if current user has createdb privileges */
2980 : bool
2981 445 : have_createdb_privilege(void)
2982 : {
2983 445 : bool result = false;
2984 : HeapTuple utup;
2985 :
2986 : /* Superusers can always do everything */
2987 445 : if (superuser())
2988 427 : return true;
2989 :
2990 18 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2991 18 : if (HeapTupleIsValid(utup))
2992 : {
2993 18 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2994 18 : ReleaseSysCache(utup);
2995 : }
2996 18 : return result;
2997 : }
2998 :
2999 : /*
3000 : * Remove tablespace directories
3001 : *
3002 : * We don't know what tablespaces db_id is using, so iterate through all
3003 : * tablespaces removing <tablespace>/db_id
3004 : */
3005 : static void
3006 49 : remove_dbtablespaces(Oid db_id)
3007 : {
3008 : Relation rel;
3009 : TableScanDesc scan;
3010 : HeapTuple tuple;
3011 49 : List *ltblspc = NIL;
3012 : ListCell *cell;
3013 : int ntblspc;
3014 : int i;
3015 : Oid *tablespace_ids;
3016 :
3017 49 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3018 49 : scan = table_beginscan_catalog(rel, 0, NULL);
3019 180 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3020 : {
3021 131 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3022 131 : Oid dsttablespace = spcform->oid;
3023 : char *dstpath;
3024 : struct stat st;
3025 :
3026 : /* Don't mess with the global tablespace */
3027 131 : if (dsttablespace == GLOBALTABLESPACE_OID)
3028 82 : continue;
3029 :
3030 82 : dstpath = GetDatabasePath(db_id, dsttablespace);
3031 :
3032 82 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3033 : {
3034 : /* Assume we can ignore it */
3035 33 : pfree(dstpath);
3036 33 : continue;
3037 : }
3038 :
3039 49 : if (!rmtree(dstpath, true))
3040 0 : ereport(WARNING,
3041 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3042 : dstpath)));
3043 :
3044 49 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3045 49 : pfree(dstpath);
3046 : }
3047 :
3048 49 : ntblspc = list_length(ltblspc);
3049 49 : if (ntblspc == 0)
3050 : {
3051 0 : table_endscan(scan);
3052 0 : table_close(rel, AccessShareLock);
3053 0 : return;
3054 : }
3055 :
3056 49 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3057 49 : i = 0;
3058 98 : foreach(cell, ltblspc)
3059 49 : tablespace_ids[i++] = lfirst_oid(cell);
3060 :
3061 : /* Record the filesystem change in XLOG */
3062 : {
3063 : xl_dbase_drop_rec xlrec;
3064 :
3065 49 : xlrec.db_id = db_id;
3066 49 : xlrec.ntablespaces = ntblspc;
3067 :
3068 49 : XLogBeginInsert();
3069 49 : XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3070 49 : XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3071 :
3072 49 : (void) XLogInsert(RM_DBASE_ID,
3073 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3074 : }
3075 :
3076 49 : list_free(ltblspc);
3077 49 : pfree(tablespace_ids);
3078 :
3079 49 : table_endscan(scan);
3080 49 : table_close(rel, AccessShareLock);
3081 : }
3082 :
3083 : /*
3084 : * Check for existing files that conflict with a proposed new DB OID;
3085 : * return true if there are any
3086 : *
3087 : * If there were a subdirectory in any tablespace matching the proposed new
3088 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3089 : * try to remove that already-existing subdirectory during the cleanup in
3090 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3091 : * instead we make this extra check before settling on the OID of the new
3092 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3093 : * relfilenumber values.
3094 : */
3095 : static bool
3096 391 : check_db_file_conflict(Oid db_id)
3097 : {
3098 391 : bool result = false;
3099 : Relation rel;
3100 : TableScanDesc scan;
3101 : HeapTuple tuple;
3102 :
3103 391 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3104 391 : scan = table_beginscan_catalog(rel, 0, NULL);
3105 1243 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3106 : {
3107 852 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3108 852 : Oid dsttablespace = spcform->oid;
3109 : char *dstpath;
3110 : struct stat st;
3111 :
3112 : /* Don't mess with the global tablespace */
3113 852 : if (dsttablespace == GLOBALTABLESPACE_OID)
3114 391 : continue;
3115 :
3116 461 : dstpath = GetDatabasePath(db_id, dsttablespace);
3117 :
3118 461 : if (lstat(dstpath, &st) == 0)
3119 : {
3120 : /* Found a conflicting file (or directory, whatever) */
3121 0 : pfree(dstpath);
3122 0 : result = true;
3123 0 : break;
3124 : }
3125 :
3126 461 : pfree(dstpath);
3127 : }
3128 :
3129 391 : table_endscan(scan);
3130 391 : table_close(rel, AccessShareLock);
3131 :
3132 391 : return result;
3133 : }
3134 :
3135 : /*
3136 : * Issue a suitable errdetail message for a busy database
3137 : */
3138 : static int
3139 1 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3140 : {
3141 1 : if (notherbackends > 0 && npreparedxacts > 0)
3142 :
3143 : /*
3144 : * We don't deal with singular versus plural here, since gettext
3145 : * doesn't support multiple plurals in one string.
3146 : */
3147 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3148 : notherbackends, npreparedxacts);
3149 1 : else if (notherbackends > 0)
3150 1 : errdetail_plural("There is %d other session using the database.",
3151 : "There are %d other sessions using the database.",
3152 : notherbackends,
3153 : notherbackends);
3154 : else
3155 0 : errdetail_plural("There is %d prepared transaction using the database.",
3156 : "There are %d prepared transactions using the database.",
3157 : npreparedxacts,
3158 : npreparedxacts);
3159 1 : return 0; /* just to keep ereport macro happy */
3160 : }
3161 :
3162 : /*
3163 : * get_database_oid - given a database name, look up the OID
3164 : *
3165 : * If missing_ok is false, throw an error if database name not found. If
3166 : * true, just return InvalidOid.
3167 : */
3168 : Oid
3169 1544 : get_database_oid(const char *dbname, bool missing_ok)
3170 : {
3171 : Relation pg_database;
3172 : ScanKeyData entry[1];
3173 : SysScanDesc scan;
3174 : HeapTuple dbtuple;
3175 : Oid oid;
3176 :
3177 : /*
3178 : * There's no syscache for pg_database indexed by name, so we must look
3179 : * the hard way.
3180 : */
3181 1544 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3182 1544 : ScanKeyInit(&entry[0],
3183 : Anum_pg_database_datname,
3184 : BTEqualStrategyNumber, F_NAMEEQ,
3185 : CStringGetDatum(dbname));
3186 1544 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3187 : NULL, 1, entry);
3188 :
3189 1544 : dbtuple = systable_getnext(scan);
3190 :
3191 : /* We assume that there can be at most one matching tuple */
3192 1544 : if (HeapTupleIsValid(dbtuple))
3193 1127 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3194 : else
3195 417 : oid = InvalidOid;
3196 :
3197 1544 : systable_endscan(scan);
3198 1544 : table_close(pg_database, AccessShareLock);
3199 :
3200 1544 : if (!OidIsValid(oid) && !missing_ok)
3201 3 : ereport(ERROR,
3202 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3203 : errmsg("database \"%s\" does not exist",
3204 : dbname)));
3205 :
3206 1541 : return oid;
3207 : }
3208 :
3209 :
3210 : /*
3211 : * While dropping a database the pg_database row is marked invalid, but the
3212 : * catalog contents still exist. Connections to such a database are not
3213 : * allowed.
3214 : */
3215 : bool
3216 27688 : database_is_invalid_form(Form_pg_database datform)
3217 : {
3218 27688 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3219 : }
3220 :
3221 :
3222 : /*
3223 : * Convenience wrapper around database_is_invalid_form()
3224 : */
3225 : bool
3226 405 : database_is_invalid_oid(Oid dboid)
3227 : {
3228 : HeapTuple dbtup;
3229 : Form_pg_database dbform;
3230 : bool invalid;
3231 :
3232 405 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3233 405 : if (!HeapTupleIsValid(dbtup))
3234 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3235 405 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3236 :
3237 405 : invalid = database_is_invalid_form(dbform);
3238 :
3239 405 : ReleaseSysCache(dbtup);
3240 :
3241 405 : return invalid;
3242 : }
3243 :
3244 :
3245 : /*
3246 : * recovery_create_dbdir()
3247 : *
3248 : * During recovery, there's a case where we validly need to recover a missing
3249 : * tablespace directory so that recovery can continue. This happens when
3250 : * recovery wants to create a database but the holding tablespace has been
3251 : * removed before the server stopped. Since we expect that the directory will
3252 : * be gone before reaching recovery consistency, and we have no knowledge about
3253 : * the tablespace other than its OID here, we create a real directory under
3254 : * pg_tblspc here instead of restoring the symlink.
3255 : *
3256 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3257 : */
3258 : static void
3259 23 : recovery_create_dbdir(char *path, bool only_tblspc)
3260 : {
3261 : struct stat st;
3262 :
3263 : Assert(RecoveryInProgress());
3264 :
3265 23 : if (stat(path, &st) == 0)
3266 23 : return;
3267 :
3268 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3269 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3270 :
3271 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3272 0 : ereport(PANIC,
3273 : errmsg("missing directory \"%s\"", path));
3274 :
3275 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3276 : "creating missing directory: %s", path);
3277 :
3278 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3279 0 : ereport(PANIC,
3280 : errmsg("could not create missing directory \"%s\": %m", path));
3281 : }
3282 :
3283 :
3284 : /*
3285 : * DATABASE resource manager's routines
3286 : */
3287 : void
3288 42 : dbase_redo(XLogReaderState *record)
3289 : {
3290 42 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3291 :
3292 : /* Backup blocks are not used in dbase records */
3293 : Assert(!XLogRecHasAnyBlockRefs(record));
3294 :
3295 42 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3296 : {
3297 5 : xl_dbase_create_file_copy_rec *xlrec =
3298 5 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3299 : char *src_path;
3300 : char *dst_path;
3301 : char *parent_path;
3302 : struct stat st;
3303 :
3304 5 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3305 5 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3306 :
3307 : /*
3308 : * Our theory for replaying a CREATE is to forcibly drop the target
3309 : * subdirectory if present, then re-copy the source data. This may be
3310 : * more work than needed, but it is simple to implement.
3311 : */
3312 5 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3313 : {
3314 0 : if (!rmtree(dst_path, true))
3315 : /* If this failed, copydir() below is going to error. */
3316 0 : ereport(WARNING,
3317 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3318 : dst_path)));
3319 : }
3320 :
3321 : /*
3322 : * If the parent of the target path doesn't exist, create it now. This
3323 : * enables us to create the target underneath later.
3324 : */
3325 5 : parent_path = pstrdup(dst_path);
3326 5 : get_parent_directory(parent_path);
3327 5 : if (stat(parent_path, &st) < 0)
3328 : {
3329 0 : if (errno != ENOENT)
3330 0 : ereport(FATAL,
3331 : errmsg("could not stat directory \"%s\": %m",
3332 : dst_path));
3333 :
3334 : /* create the parent directory if needed and valid */
3335 0 : recovery_create_dbdir(parent_path, true);
3336 : }
3337 5 : pfree(parent_path);
3338 :
3339 : /*
3340 : * There's a case where the copy source directory is missing for the
3341 : * same reason above. Create the empty source directory so that
3342 : * copydir below doesn't fail. The directory will be dropped soon by
3343 : * recovery.
3344 : */
3345 5 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3346 0 : recovery_create_dbdir(src_path, false);
3347 :
3348 : /*
3349 : * Force dirty buffers out to disk, to ensure source database is
3350 : * up-to-date for the copy.
3351 : */
3352 5 : FlushDatabaseBuffers(xlrec->src_db_id);
3353 :
3354 : /* Close all smgr fds in all backends. */
3355 5 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3356 :
3357 : /*
3358 : * Copy this subdirectory to the new location
3359 : *
3360 : * We don't need to copy subdirectories
3361 : */
3362 5 : copydir(src_path, dst_path, false);
3363 :
3364 5 : pfree(src_path);
3365 5 : pfree(dst_path);
3366 : }
3367 37 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3368 : {
3369 23 : xl_dbase_create_wal_log_rec *xlrec =
3370 23 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3371 : char *dbpath;
3372 : char *parent_path;
3373 :
3374 23 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3375 :
3376 : /* create the parent directory if needed and valid */
3377 23 : parent_path = pstrdup(dbpath);
3378 23 : get_parent_directory(parent_path);
3379 23 : recovery_create_dbdir(parent_path, true);
3380 23 : pfree(parent_path);
3381 :
3382 : /* Create the database directory with the version file. */
3383 23 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3384 : true);
3385 23 : pfree(dbpath);
3386 : }
3387 14 : else if (info == XLOG_DBASE_DROP)
3388 : {
3389 14 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3390 : char *dst_path;
3391 : int i;
3392 :
3393 14 : if (InHotStandby)
3394 : {
3395 : /*
3396 : * Lock database while we resolve conflicts to ensure that
3397 : * InitPostgres() cannot fully re-execute concurrently. This
3398 : * avoids backends re-connecting automatically to same database,
3399 : * which can happen in some cases.
3400 : *
3401 : * This will lock out walsenders trying to connect to db-specific
3402 : * slots for logical decoding too, so it's safe for us to drop
3403 : * slots.
3404 : */
3405 14 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3406 14 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3407 : }
3408 :
3409 : /* Drop any database-specific replication slots */
3410 14 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3411 :
3412 : /* Drop pages for this database that are in the shared buffer cache */
3413 14 : DropDatabaseBuffers(xlrec->db_id);
3414 :
3415 : /* Also, clean out any fsync requests that might be pending in md.c */
3416 14 : ForgetDatabaseSyncRequests(xlrec->db_id);
3417 :
3418 : /* Clean out the xlog relcache too */
3419 14 : XLogDropDatabase(xlrec->db_id);
3420 :
3421 : /* Close all smgr fds in all backends. */
3422 14 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3423 :
3424 28 : for (i = 0; i < xlrec->ntablespaces; i++)
3425 : {
3426 14 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3427 :
3428 : /* And remove the physical files */
3429 14 : if (!rmtree(dst_path, true))
3430 0 : ereport(WARNING,
3431 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3432 : dst_path)));
3433 14 : pfree(dst_path);
3434 : }
3435 :
3436 14 : if (InHotStandby)
3437 : {
3438 : /*
3439 : * Release locks prior to commit. XXX There is a race condition
3440 : * here that may allow backends to reconnect, but the window for
3441 : * this is small because the gap between here and commit is mostly
3442 : * fairly small and it is unlikely that people will be dropping
3443 : * databases that we are trying to connect to anyway.
3444 : */
3445 14 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3446 : }
3447 : }
3448 : else
3449 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3450 42 : }
|