Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/lsyscache.h"
68 : #include "utils/pg_locale.h"
69 : #include "utils/relmapper.h"
70 : #include "utils/snapmgr.h"
71 : #include "utils/syscache.h"
72 :
73 : /*
74 : * Create database strategy.
75 : *
76 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
77 : * copied block.
78 : *
79 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
80 : * database and log a single record for each tablespace copied. To make this
81 : * safe, it also triggers checkpoints before and after the operation.
82 : */
83 : typedef enum CreateDBStrategy
84 : {
85 : CREATEDB_WAL_LOG,
86 : CREATEDB_FILE_COPY,
87 : } CreateDBStrategy;
88 :
89 : typedef struct
90 : {
91 : Oid src_dboid; /* source (template) DB */
92 : Oid dest_dboid; /* DB we are trying to create */
93 : CreateDBStrategy strategy; /* create db strategy */
94 : } createdb_failure_params;
95 :
96 : typedef struct
97 : {
98 : Oid dest_dboid; /* DB we are trying to move */
99 : Oid dest_tsoid; /* tablespace we are trying to move to */
100 : } movedb_failure_params;
101 :
102 : /*
103 : * Information about a relation to be copied when creating a database.
104 : */
105 : typedef struct CreateDBRelInfo
106 : {
107 : RelFileLocator rlocator; /* physical relation identifier */
108 : Oid reloid; /* relation oid */
109 : bool permanent; /* relation is permanent or unlogged */
110 : } CreateDBRelInfo;
111 :
112 :
113 : /* non-export function prototypes */
114 : static void createdb_failure_callback(int code, Datum arg);
115 : static void movedb(const char *dbname, const char *tblspcname);
116 : static void movedb_failure_callback(int code, Datum arg);
117 : static bool get_db_info(const char *name, LOCKMODE lockmode,
118 : Oid *dbIdP, Oid *ownerIdP,
119 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
120 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
121 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
122 : char **dbIcurules,
123 : char *dbLocProvider,
124 : char **dbCollversion);
125 : static void remove_dbtablespaces(Oid db_id);
126 : static bool check_db_file_conflict(Oid db_id);
127 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
128 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
129 : Oid dst_tsid);
130 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
131 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
132 : Oid dbid, char *srcpath,
133 : List *rlocatorlist, Snapshot snapshot);
134 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
135 : Oid tbid, Oid dbid,
136 : char *srcpath);
137 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
138 : bool isRedo);
139 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
140 : Oid src_tsid, Oid dst_tsid);
141 : static void recovery_create_dbdir(char *path, bool only_tblspc);
142 :
143 : /*
144 : * Create a new database using the WAL_LOG strategy.
145 : *
146 : * Each copied block is separately written to the write-ahead log.
147 : */
148 : static void
149 496 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
150 : Oid src_tsid, Oid dst_tsid)
151 : {
152 : char *srcpath;
153 : char *dstpath;
154 496 : List *rlocatorlist = NULL;
155 : ListCell *cell;
156 : LockRelId srcrelid;
157 : LockRelId dstrelid;
158 : RelFileLocator srcrlocator;
159 : RelFileLocator dstrlocator;
160 : CreateDBRelInfo *relinfo;
161 :
162 : /* Get source and destination database paths. */
163 496 : srcpath = GetDatabasePath(src_dboid, src_tsid);
164 496 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
165 :
166 : /* Create database directory and write PG_VERSION file. */
167 496 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
168 :
169 : /* Copy relmap file from source database to the destination database. */
170 496 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
171 :
172 : /* Get list of relfilelocators to copy from the source database. */
173 496 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
174 : Assert(rlocatorlist != NIL);
175 :
176 : /*
177 : * Database IDs will be the same for all relations so set them before
178 : * entering the loop.
179 : */
180 496 : srcrelid.dbId = src_dboid;
181 496 : dstrelid.dbId = dst_dboid;
182 :
183 : /* Loop over our list of relfilelocators and copy each one. */
184 110740 : foreach(cell, rlocatorlist)
185 : {
186 110248 : relinfo = lfirst(cell);
187 110248 : srcrlocator = relinfo->rlocator;
188 :
189 : /*
190 : * If the relation is from the source db's default tablespace then we
191 : * need to create it in the destination db's default tablespace.
192 : * Otherwise, we need to create in the same tablespace as it is in the
193 : * source database.
194 : */
195 110248 : if (srcrlocator.spcOid == src_tsid)
196 110248 : dstrlocator.spcOid = dst_tsid;
197 : else
198 0 : dstrlocator.spcOid = srcrlocator.spcOid;
199 :
200 110248 : dstrlocator.dbOid = dst_dboid;
201 110248 : dstrlocator.relNumber = srcrlocator.relNumber;
202 :
203 : /*
204 : * Acquire locks on source and target relations before copying.
205 : *
206 : * We typically do not read relation data into shared_buffers without
207 : * holding a relation lock. It's unclear what could go wrong if we
208 : * skipped it in this case, because nobody can be modifying either the
209 : * source or destination database at this point, and we have locks on
210 : * both databases, too, but let's take the conservative route.
211 : */
212 110248 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
213 110248 : LockRelationId(&srcrelid, AccessShareLock);
214 110248 : LockRelationId(&dstrelid, AccessShareLock);
215 :
216 : /* Copy relation storage from source to the destination. */
217 110248 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
218 :
219 : /* Release the relation locks. */
220 110244 : UnlockRelationId(&srcrelid, AccessShareLock);
221 110244 : UnlockRelationId(&dstrelid, AccessShareLock);
222 : }
223 :
224 492 : pfree(srcpath);
225 492 : pfree(dstpath);
226 492 : list_free_deep(rlocatorlist);
227 492 : }
228 :
229 : /*
230 : * Scan the pg_class table in the source database to identify the relations
231 : * that need to be copied to the destination database.
232 : *
233 : * This is an exception to the usual rule that cross-database access is
234 : * not possible. We can make it work here because we know that there are no
235 : * connections to the source database and (since there can't be prepared
236 : * transactions touching that database) no in-doubt tuples either. This
237 : * means that we don't need to worry about pruning removing anything from
238 : * under us, and we don't need to be too picky about our snapshot either.
239 : * As long as it sees all previously-committed XIDs as committed and all
240 : * aborted XIDs as aborted, we should be fine: nothing else is possible
241 : * here.
242 : *
243 : * We can't rely on the relcache for anything here, because that only knows
244 : * about the database to which we are connected, and can't handle access to
245 : * other databases. That also means we can't rely on the heap scan
246 : * infrastructure, which would be a bad idea anyway since it might try
247 : * to do things like HOT pruning which we definitely can't do safely in
248 : * a database to which we're not even connected.
249 : */
250 : static List *
251 496 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
252 : {
253 : RelFileLocator rlocator;
254 : BlockNumber nblocks;
255 : BlockNumber blkno;
256 : Buffer buf;
257 : RelFileNumber relfilenumber;
258 : Page page;
259 496 : List *rlocatorlist = NIL;
260 : LockRelId relid;
261 : Snapshot snapshot;
262 : SMgrRelation smgr;
263 : BufferAccessStrategy bstrategy;
264 :
265 : /* Get pg_class relfilenumber. */
266 496 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
267 : RelationRelationId);
268 :
269 : /* Don't read data into shared_buffers without holding a relation lock. */
270 496 : relid.dbId = dbid;
271 496 : relid.relId = RelationRelationId;
272 496 : LockRelationId(&relid, AccessShareLock);
273 :
274 : /* Prepare a RelFileLocator for the pg_class relation. */
275 496 : rlocator.spcOid = tbid;
276 496 : rlocator.dbOid = dbid;
277 496 : rlocator.relNumber = relfilenumber;
278 :
279 496 : smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
280 496 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
281 496 : smgrclose(smgr);
282 :
283 : /* Use a buffer access strategy since this is a bulk read operation. */
284 496 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
285 :
286 : /*
287 : * As explained in the function header comments, we need a snapshot that
288 : * will see all committed transactions as committed, and our transaction
289 : * snapshot - or the active snapshot - might not be new enough for that,
290 : * but the return value of GetLatestSnapshot() should work fine.
291 : */
292 496 : snapshot = RegisterSnapshot(GetLatestSnapshot());
293 :
294 : /* Process the relation block by block. */
295 7440 : for (blkno = 0; blkno < nblocks; blkno++)
296 : {
297 6944 : CHECK_FOR_INTERRUPTS();
298 :
299 6944 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
300 : RBM_NORMAL, bstrategy, true);
301 :
302 6944 : LockBuffer(buf, BUFFER_LOCK_SHARE);
303 6944 : page = BufferGetPage(buf);
304 6944 : if (PageIsNew(page) || PageIsEmpty(page))
305 : {
306 0 : UnlockReleaseBuffer(buf);
307 0 : continue;
308 : }
309 :
310 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
311 6944 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
312 : srcpath, rlocatorlist,
313 : snapshot);
314 :
315 6944 : UnlockReleaseBuffer(buf);
316 : }
317 496 : UnregisterSnapshot(snapshot);
318 :
319 : /* Release relation lock. */
320 496 : UnlockRelationId(&relid, AccessShareLock);
321 :
322 496 : return rlocatorlist;
323 : }
324 :
325 : /*
326 : * Scan one page of the source database's pg_class relation and add relevant
327 : * entries to rlocatorlist. The return value is the updated list.
328 : */
329 : static List *
330 6944 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
331 : char *srcpath, List *rlocatorlist,
332 : Snapshot snapshot)
333 : {
334 6944 : BlockNumber blkno = BufferGetBlockNumber(buf);
335 : OffsetNumber offnum;
336 : OffsetNumber maxoff;
337 : HeapTupleData tuple;
338 :
339 6944 : maxoff = PageGetMaxOffsetNumber(page);
340 :
341 : /* Loop over offsets. */
342 6944 : for (offnum = FirstOffsetNumber;
343 355318 : offnum <= maxoff;
344 348374 : offnum = OffsetNumberNext(offnum))
345 : {
346 : ItemId itemid;
347 :
348 348374 : itemid = PageGetItemId(page, offnum);
349 :
350 : /* Nothing to do if slot is empty or already dead. */
351 348374 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
352 249558 : ItemIdIsRedirected(itemid))
353 141968 : continue;
354 :
355 : Assert(ItemIdIsNormal(itemid));
356 206406 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
357 :
358 : /* Initialize a HeapTupleData structure. */
359 206406 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
360 206406 : tuple.t_len = ItemIdGetLength(itemid);
361 206406 : tuple.t_tableOid = RelationRelationId;
362 :
363 : /* Skip tuples that are not visible to this snapshot. */
364 206406 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
365 : {
366 : CreateDBRelInfo *relinfo;
367 :
368 : /*
369 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
370 : * CreateDBRelInfo object for this tuple, but can also decide that
371 : * this tuple isn't something we need to copy. If we do need to
372 : * copy the relation, add it to the list.
373 : */
374 206376 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
375 : srcpath);
376 206376 : if (relinfo != NULL)
377 111144 : rlocatorlist = lappend(rlocatorlist, relinfo);
378 : }
379 : }
380 :
381 6944 : return rlocatorlist;
382 : }
383 :
384 : /*
385 : * Decide whether a certain pg_class tuple represents something that
386 : * needs to be copied from the source database to the destination database,
387 : * and if so, construct a CreateDBRelInfo for it.
388 : *
389 : * Visibility checks are handled by the caller, so our job here is just
390 : * to assess the data stored in the tuple.
391 : */
392 : CreateDBRelInfo *
393 206376 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
394 : char *srcpath)
395 : {
396 : CreateDBRelInfo *relinfo;
397 : Form_pg_class classForm;
398 206376 : RelFileNumber relfilenumber = InvalidRelFileNumber;
399 :
400 206376 : classForm = (Form_pg_class) GETSTRUCT(tuple);
401 :
402 : /*
403 : * Return NULL if this object does not need to be copied.
404 : *
405 : * Shared objects don't need to be copied, because they are shared.
406 : * Objects without storage can't be copied, because there's nothing to
407 : * copy. Temporary relations don't need to be copied either, because they
408 : * are inaccessible outside of the session that created them, which must
409 : * be gone already, and couldn't connect to a different database if it
410 : * still existed. autovacuum will eventually remove the pg_class entries
411 : * as well.
412 : */
413 206376 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
414 183560 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
415 111144 : classForm->relpersistence == RELPERSISTENCE_TEMP)
416 95232 : return NULL;
417 :
418 : /*
419 : * If relfilenumber is valid then directly use it. Otherwise, consult the
420 : * relmap.
421 : */
422 111144 : if (RelFileNumberIsValid(classForm->relfilenode))
423 102712 : relfilenumber = classForm->relfilenode;
424 : else
425 8432 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
426 : classForm->oid);
427 :
428 : /* We must have a valid relfilenumber. */
429 111144 : if (!RelFileNumberIsValid(relfilenumber))
430 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
431 : classForm->oid);
432 :
433 : /* Prepare a rel info element and add it to the list. */
434 111144 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
435 111144 : if (OidIsValid(classForm->reltablespace))
436 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
437 : else
438 111144 : relinfo->rlocator.spcOid = tbid;
439 :
440 111144 : relinfo->rlocator.dbOid = dbid;
441 111144 : relinfo->rlocator.relNumber = relfilenumber;
442 111144 : relinfo->reloid = classForm->oid;
443 :
444 : /* Temporary relations were rejected above. */
445 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
446 111144 : relinfo->permanent =
447 111144 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
448 :
449 111144 : return relinfo;
450 : }
451 :
452 : /*
453 : * Create database directory and write out the PG_VERSION file in the database
454 : * path. If isRedo is true, it's okay for the database directory to exist
455 : * already.
456 : */
457 : static void
458 542 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
459 : {
460 : int fd;
461 : int nbytes;
462 : char versionfile[MAXPGPATH];
463 : char buf[16];
464 :
465 : /*
466 : * Note that we don't have to copy version data from the source database;
467 : * there's only one legal value.
468 : */
469 542 : sprintf(buf, "%s\n", PG_MAJORVERSION);
470 542 : nbytes = strlen(PG_MAJORVERSION) + 1;
471 :
472 : /* Create database directory. */
473 542 : if (MakePGDirectory(dbpath) < 0)
474 : {
475 : /* Failure other than already exists or not in WAL replay? */
476 16 : if (errno != EEXIST || !isRedo)
477 0 : ereport(ERROR,
478 : (errcode_for_file_access(),
479 : errmsg("could not create directory \"%s\": %m", dbpath)));
480 : }
481 :
482 : /*
483 : * Create PG_VERSION file in the database path. If the file already
484 : * exists and we are in WAL replay then try again to open it in write
485 : * mode.
486 : */
487 542 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
488 :
489 542 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
490 542 : if (fd < 0 && errno == EEXIST && isRedo)
491 16 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
492 :
493 542 : if (fd < 0)
494 0 : ereport(ERROR,
495 : (errcode_for_file_access(),
496 : errmsg("could not create file \"%s\": %m", versionfile)));
497 :
498 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
499 542 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
500 542 : errno = 0;
501 542 : if ((int) write(fd, buf, nbytes) != nbytes)
502 : {
503 : /* If write didn't set errno, assume problem is no disk space. */
504 0 : if (errno == 0)
505 0 : errno = ENOSPC;
506 0 : ereport(ERROR,
507 : (errcode_for_file_access(),
508 : errmsg("could not write to file \"%s\": %m", versionfile)));
509 : }
510 542 : pgstat_report_wait_end();
511 :
512 542 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
513 542 : if (pg_fsync(fd) != 0)
514 0 : ereport(data_sync_elevel(ERROR),
515 : (errcode_for_file_access(),
516 : errmsg("could not fsync file \"%s\": %m", versionfile)));
517 542 : fsync_fname(dbpath, true);
518 542 : pgstat_report_wait_end();
519 :
520 : /* Close the version file. */
521 542 : CloseTransientFile(fd);
522 :
523 : /* If we are not in WAL replay then write the WAL. */
524 542 : if (!isRedo)
525 : {
526 : xl_dbase_create_wal_log_rec xlrec;
527 :
528 496 : START_CRIT_SECTION();
529 :
530 496 : xlrec.db_id = dbid;
531 496 : xlrec.tablespace_id = tsid;
532 :
533 496 : XLogBeginInsert();
534 496 : XLogRegisterData(&xlrec,
535 : sizeof(xl_dbase_create_wal_log_rec));
536 :
537 496 : (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
538 :
539 496 : END_CRIT_SECTION();
540 : }
541 542 : }
542 :
543 : /*
544 : * Create a new database using the FILE_COPY strategy.
545 : *
546 : * Copy each tablespace at the filesystem level, and log a single WAL record
547 : * for each tablespace copied. This requires a checkpoint before and after the
548 : * copy, which may be expensive, but it does greatly reduce WAL generation
549 : * if the copied database is large.
550 : */
551 : static void
552 268 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
553 : Oid dst_tsid)
554 : {
555 : TableScanDesc scan;
556 : Relation rel;
557 : HeapTuple tuple;
558 :
559 : /*
560 : * Force a checkpoint before starting the copy. This will force all dirty
561 : * buffers, including those of unlogged tables, out to disk, to ensure
562 : * source database is up-to-date on disk for the copy.
563 : * FlushDatabaseBuffers() would suffice for that, but we also want to
564 : * process any pending unlink requests. Otherwise, if a checkpoint
565 : * happened while we're copying files, a file might be deleted just when
566 : * we're about to copy it, causing the lstat() call in copydir() to fail
567 : * with ENOENT.
568 : *
569 : * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
570 : * is careful to ensure that template0 is fully written to disk prior to
571 : * any CREATE DATABASE commands.
572 : */
573 268 : if (!IsBinaryUpgrade)
574 212 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
575 : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
576 :
577 : /*
578 : * Iterate through all tablespaces of the template database, and copy each
579 : * one to the new database.
580 : */
581 268 : rel = table_open(TableSpaceRelationId, AccessShareLock);
582 268 : scan = table_beginscan_catalog(rel, 0, NULL);
583 872 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
584 : {
585 604 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
586 604 : Oid srctablespace = spaceform->oid;
587 : Oid dsttablespace;
588 : char *srcpath;
589 : char *dstpath;
590 : struct stat st;
591 :
592 : /* No need to copy global tablespace */
593 604 : if (srctablespace == GLOBALTABLESPACE_OID)
594 336 : continue;
595 :
596 336 : srcpath = GetDatabasePath(src_dboid, srctablespace);
597 :
598 604 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
599 268 : directory_is_empty(srcpath))
600 : {
601 : /* Assume we can ignore it */
602 68 : pfree(srcpath);
603 68 : continue;
604 : }
605 :
606 268 : if (srctablespace == src_tsid)
607 268 : dsttablespace = dst_tsid;
608 : else
609 0 : dsttablespace = srctablespace;
610 :
611 268 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
612 :
613 : /*
614 : * Copy this subdirectory to the new location
615 : *
616 : * We don't need to copy subdirectories
617 : */
618 268 : copydir(srcpath, dstpath, false);
619 :
620 : /* Record the filesystem change in XLOG */
621 : {
622 : xl_dbase_create_file_copy_rec xlrec;
623 :
624 268 : xlrec.db_id = dst_dboid;
625 268 : xlrec.tablespace_id = dsttablespace;
626 268 : xlrec.src_db_id = src_dboid;
627 268 : xlrec.src_tablespace_id = srctablespace;
628 :
629 268 : XLogBeginInsert();
630 268 : XLogRegisterData(&xlrec,
631 : sizeof(xl_dbase_create_file_copy_rec));
632 :
633 268 : (void) XLogInsert(RM_DBASE_ID,
634 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
635 : }
636 268 : pfree(srcpath);
637 268 : pfree(dstpath);
638 : }
639 268 : table_endscan(scan);
640 268 : table_close(rel, AccessShareLock);
641 :
642 : /*
643 : * We force a checkpoint before committing. This effectively means that
644 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
645 : * replayed (at least not in ordinary crash recovery; we still have to
646 : * make the XLOG entry for the benefit of PITR operations). This avoids
647 : * two nasty scenarios:
648 : *
649 : * #1: At wal_level=minimal, we don't XLOG the contents of newly created
650 : * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
651 : * of DBASE_CREATE replay would lose such files created in the new
652 : * database between our commit and the next checkpoint.
653 : *
654 : * #2: Since we have to recopy the source database during DBASE_CREATE
655 : * replay, we run the risk of copying changes in it that were committed
656 : * after the original CREATE DATABASE command but before the system crash
657 : * that led to the replay. This is at least unexpected and at worst could
658 : * lead to inconsistencies, eg duplicate table names.
659 : *
660 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
661 : *
662 : * In PITR replay, the first of these isn't an issue, and the second is
663 : * only a risk if the CREATE DATABASE and subsequent template database
664 : * change both occur while a base backup is being taken. There doesn't
665 : * seem to be much we can do about that except document it as a
666 : * limitation.
667 : *
668 : * In binary upgrade mode, we can skip this checkpoint because neither of
669 : * these problems applies: we don't ever replay the WAL generated during
670 : * pg_upgrade, and we don't support taking base backups during pg_upgrade
671 : * (not to mention that we don't concurrently modify template0, either).
672 : *
673 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
674 : * strategy that avoids these problems.
675 : */
676 268 : if (!IsBinaryUpgrade)
677 212 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
678 : CHECKPOINT_WAIT);
679 268 : }
680 :
681 : /*
682 : * CREATE DATABASE
683 : */
684 : Oid
685 798 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
686 : {
687 : Oid src_dboid;
688 : Oid src_owner;
689 798 : int src_encoding = -1;
690 798 : char *src_collate = NULL;
691 798 : char *src_ctype = NULL;
692 798 : char *src_locale = NULL;
693 798 : char *src_icurules = NULL;
694 798 : char src_locprovider = '\0';
695 798 : char *src_collversion = NULL;
696 : bool src_istemplate;
697 798 : bool src_hasloginevt = false;
698 : bool src_allowconn;
699 798 : TransactionId src_frozenxid = InvalidTransactionId;
700 798 : MultiXactId src_minmxid = InvalidMultiXactId;
701 : Oid src_deftablespace;
702 : volatile Oid dst_deftablespace;
703 : Relation pg_database_rel;
704 : HeapTuple tuple;
705 798 : Datum new_record[Natts_pg_database] = {0};
706 798 : bool new_record_nulls[Natts_pg_database] = {0};
707 798 : Oid dboid = InvalidOid;
708 : Oid datdba;
709 : ListCell *option;
710 798 : DefElem *tablespacenameEl = NULL;
711 798 : DefElem *ownerEl = NULL;
712 798 : DefElem *templateEl = NULL;
713 798 : DefElem *encodingEl = NULL;
714 798 : DefElem *localeEl = NULL;
715 798 : DefElem *builtinlocaleEl = NULL;
716 798 : DefElem *collateEl = NULL;
717 798 : DefElem *ctypeEl = NULL;
718 798 : DefElem *iculocaleEl = NULL;
719 798 : DefElem *icurulesEl = NULL;
720 798 : DefElem *locproviderEl = NULL;
721 798 : DefElem *istemplateEl = NULL;
722 798 : DefElem *allowconnectionsEl = NULL;
723 798 : DefElem *connlimitEl = NULL;
724 798 : DefElem *collversionEl = NULL;
725 798 : DefElem *strategyEl = NULL;
726 798 : char *dbname = stmt->dbname;
727 798 : char *dbowner = NULL;
728 798 : const char *dbtemplate = NULL;
729 798 : char *dbcollate = NULL;
730 798 : char *dbctype = NULL;
731 798 : const char *dblocale = NULL;
732 798 : char *dbicurules = NULL;
733 798 : char dblocprovider = '\0';
734 : char *canonname;
735 798 : int encoding = -1;
736 798 : bool dbistemplate = false;
737 798 : bool dballowconnections = true;
738 798 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
739 798 : char *dbcollversion = NULL;
740 : int notherbackends;
741 : int npreparedxacts;
742 798 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
743 : createdb_failure_params fparms;
744 :
745 : /* Extract options from the statement node tree */
746 2382 : foreach(option, stmt->options)
747 : {
748 1584 : DefElem *defel = (DefElem *) lfirst(option);
749 :
750 1584 : if (strcmp(defel->defname, "tablespace") == 0)
751 : {
752 34 : if (tablespacenameEl)
753 0 : errorConflictingDefElem(defel, pstate);
754 34 : tablespacenameEl = defel;
755 : }
756 1550 : else if (strcmp(defel->defname, "owner") == 0)
757 : {
758 2 : if (ownerEl)
759 0 : errorConflictingDefElem(defel, pstate);
760 2 : ownerEl = defel;
761 : }
762 1548 : else if (strcmp(defel->defname, "template") == 0)
763 : {
764 354 : if (templateEl)
765 0 : errorConflictingDefElem(defel, pstate);
766 354 : templateEl = defel;
767 : }
768 1194 : else if (strcmp(defel->defname, "encoding") == 0)
769 : {
770 104 : if (encodingEl)
771 0 : errorConflictingDefElem(defel, pstate);
772 104 : encodingEl = defel;
773 : }
774 1090 : else if (strcmp(defel->defname, "locale") == 0)
775 : {
776 106 : if (localeEl)
777 0 : errorConflictingDefElem(defel, pstate);
778 106 : localeEl = defel;
779 : }
780 984 : else if (strcmp(defel->defname, "builtin_locale") == 0)
781 : {
782 16 : if (builtinlocaleEl)
783 0 : errorConflictingDefElem(defel, pstate);
784 16 : builtinlocaleEl = defel;
785 : }
786 968 : else if (strcmp(defel->defname, "lc_collate") == 0)
787 : {
788 18 : if (collateEl)
789 0 : errorConflictingDefElem(defel, pstate);
790 18 : collateEl = defel;
791 : }
792 950 : else if (strcmp(defel->defname, "lc_ctype") == 0)
793 : {
794 18 : if (ctypeEl)
795 0 : errorConflictingDefElem(defel, pstate);
796 18 : ctypeEl = defel;
797 : }
798 932 : else if (strcmp(defel->defname, "icu_locale") == 0)
799 : {
800 10 : if (iculocaleEl)
801 0 : errorConflictingDefElem(defel, pstate);
802 10 : iculocaleEl = defel;
803 : }
804 922 : else if (strcmp(defel->defname, "icu_rules") == 0)
805 : {
806 2 : if (icurulesEl)
807 0 : errorConflictingDefElem(defel, pstate);
808 2 : icurulesEl = defel;
809 : }
810 920 : else if (strcmp(defel->defname, "locale_provider") == 0)
811 : {
812 116 : if (locproviderEl)
813 0 : errorConflictingDefElem(defel, pstate);
814 116 : locproviderEl = defel;
815 : }
816 804 : else if (strcmp(defel->defname, "is_template") == 0)
817 : {
818 100 : if (istemplateEl)
819 0 : errorConflictingDefElem(defel, pstate);
820 100 : istemplateEl = defel;
821 : }
822 704 : else if (strcmp(defel->defname, "allow_connections") == 0)
823 : {
824 98 : if (allowconnectionsEl)
825 0 : errorConflictingDefElem(defel, pstate);
826 98 : allowconnectionsEl = defel;
827 : }
828 606 : else if (strcmp(defel->defname, "connection_limit") == 0)
829 : {
830 0 : if (connlimitEl)
831 0 : errorConflictingDefElem(defel, pstate);
832 0 : connlimitEl = defel;
833 : }
834 606 : else if (strcmp(defel->defname, "collation_version") == 0)
835 : {
836 56 : if (collversionEl)
837 0 : errorConflictingDefElem(defel, pstate);
838 56 : collversionEl = defel;
839 : }
840 550 : else if (strcmp(defel->defname, "location") == 0)
841 : {
842 0 : ereport(WARNING,
843 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
844 : errmsg("LOCATION is not supported anymore"),
845 : errhint("Consider using tablespaces instead."),
846 : parser_errposition(pstate, defel->location)));
847 : }
848 550 : else if (strcmp(defel->defname, "oid") == 0)
849 : {
850 258 : dboid = defGetObjectId(defel);
851 :
852 : /*
853 : * We don't normally permit new databases to be created with
854 : * system-assigned OIDs. pg_upgrade tries to preserve database
855 : * OIDs, so we can't allow any database to be created with an OID
856 : * that might be in use in a freshly-initialized cluster created
857 : * by some future version. We assume all such OIDs will be from
858 : * the system-managed OID range.
859 : *
860 : * As an exception, however, we permit any OID to be assigned when
861 : * allow_system_table_mods=on (so that initdb can assign system
862 : * OIDs to template0 and postgres) or when performing a binary
863 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
864 : * in the source cluster).
865 : */
866 258 : if (dboid < FirstNormalObjectId &&
867 224 : !allowSystemTableMods && !IsBinaryUpgrade)
868 0 : ereport(ERROR,
869 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
870 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
871 : }
872 292 : else if (strcmp(defel->defname, "strategy") == 0)
873 : {
874 292 : if (strategyEl)
875 0 : errorConflictingDefElem(defel, pstate);
876 292 : strategyEl = defel;
877 : }
878 : else
879 0 : ereport(ERROR,
880 : (errcode(ERRCODE_SYNTAX_ERROR),
881 : errmsg("option \"%s\" not recognized", defel->defname),
882 : parser_errposition(pstate, defel->location)));
883 : }
884 :
885 798 : if (ownerEl && ownerEl->arg)
886 2 : dbowner = defGetString(ownerEl);
887 798 : if (templateEl && templateEl->arg)
888 354 : dbtemplate = defGetString(templateEl);
889 798 : if (encodingEl && encodingEl->arg)
890 : {
891 : const char *encoding_name;
892 :
893 104 : if (IsA(encodingEl->arg, Integer))
894 : {
895 0 : encoding = defGetInt32(encodingEl);
896 0 : encoding_name = pg_encoding_to_char(encoding);
897 0 : if (strcmp(encoding_name, "") == 0 ||
898 0 : pg_valid_server_encoding(encoding_name) < 0)
899 0 : ereport(ERROR,
900 : (errcode(ERRCODE_UNDEFINED_OBJECT),
901 : errmsg("%d is not a valid encoding code",
902 : encoding),
903 : parser_errposition(pstate, encodingEl->location)));
904 : }
905 : else
906 : {
907 104 : encoding_name = defGetString(encodingEl);
908 104 : encoding = pg_valid_server_encoding(encoding_name);
909 104 : if (encoding < 0)
910 0 : ereport(ERROR,
911 : (errcode(ERRCODE_UNDEFINED_OBJECT),
912 : errmsg("%s is not a valid encoding name",
913 : encoding_name),
914 : parser_errposition(pstate, encodingEl->location)));
915 : }
916 : }
917 798 : if (localeEl && localeEl->arg)
918 : {
919 106 : dbcollate = defGetString(localeEl);
920 106 : dbctype = defGetString(localeEl);
921 106 : dblocale = defGetString(localeEl);
922 : }
923 798 : if (builtinlocaleEl && builtinlocaleEl->arg)
924 16 : dblocale = defGetString(builtinlocaleEl);
925 798 : if (collateEl && collateEl->arg)
926 18 : dbcollate = defGetString(collateEl);
927 798 : if (ctypeEl && ctypeEl->arg)
928 18 : dbctype = defGetString(ctypeEl);
929 798 : if (iculocaleEl && iculocaleEl->arg)
930 10 : dblocale = defGetString(iculocaleEl);
931 798 : if (icurulesEl && icurulesEl->arg)
932 2 : dbicurules = defGetString(icurulesEl);
933 798 : if (locproviderEl && locproviderEl->arg)
934 : {
935 116 : char *locproviderstr = defGetString(locproviderEl);
936 :
937 116 : if (pg_strcasecmp(locproviderstr, "builtin") == 0)
938 34 : dblocprovider = COLLPROVIDER_BUILTIN;
939 82 : else if (pg_strcasecmp(locproviderstr, "icu") == 0)
940 16 : dblocprovider = COLLPROVIDER_ICU;
941 66 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
942 64 : dblocprovider = COLLPROVIDER_LIBC;
943 : else
944 2 : ereport(ERROR,
945 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
946 : errmsg("unrecognized locale provider: %s",
947 : locproviderstr)));
948 : }
949 796 : if (istemplateEl && istemplateEl->arg)
950 100 : dbistemplate = defGetBoolean(istemplateEl);
951 796 : if (allowconnectionsEl && allowconnectionsEl->arg)
952 98 : dballowconnections = defGetBoolean(allowconnectionsEl);
953 796 : if (connlimitEl && connlimitEl->arg)
954 : {
955 0 : dbconnlimit = defGetInt32(connlimitEl);
956 0 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
957 0 : ereport(ERROR,
958 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
959 : errmsg("invalid connection limit: %d", dbconnlimit)));
960 : }
961 796 : if (collversionEl)
962 56 : dbcollversion = defGetString(collversionEl);
963 :
964 : /* obtain OID of proposed owner */
965 796 : if (dbowner)
966 2 : datdba = get_role_oid(dbowner, false);
967 : else
968 794 : datdba = GetUserId();
969 :
970 : /*
971 : * To create a database, must have createdb privilege and must be able to
972 : * become the target role (this does not imply that the target role itself
973 : * must have createdb privilege). The latter provision guards against
974 : * "giveaway" attacks. Note that a superuser will always have both of
975 : * these privileges a fortiori.
976 : */
977 796 : if (!have_createdb_privilege())
978 6 : ereport(ERROR,
979 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
980 : errmsg("permission denied to create database")));
981 :
982 790 : check_can_set_role(GetUserId(), datdba);
983 :
984 : /*
985 : * Lookup database (template) to be cloned, and obtain share lock on it.
986 : * ShareLock allows two CREATE DATABASEs to work from the same template
987 : * concurrently, while ensuring no one is busy dropping it in parallel
988 : * (which would be Very Bad since we'd likely get an incomplete copy
989 : * without knowing it). This also prevents any new connections from being
990 : * made to the source until we finish copying it, so we can be sure it
991 : * won't change underneath us.
992 : */
993 790 : if (!dbtemplate)
994 438 : dbtemplate = "template1"; /* Default template database name */
995 :
996 790 : if (!get_db_info(dbtemplate, ShareLock,
997 : &src_dboid, &src_owner, &src_encoding,
998 : &src_istemplate, &src_allowconn, &src_hasloginevt,
999 : &src_frozenxid, &src_minmxid, &src_deftablespace,
1000 : &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1001 : &src_collversion))
1002 0 : ereport(ERROR,
1003 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1004 : errmsg("template database \"%s\" does not exist",
1005 : dbtemplate)));
1006 :
1007 : /*
1008 : * If the source database was in the process of being dropped, we can't
1009 : * use it as a template.
1010 : */
1011 790 : if (database_is_invalid_oid(src_dboid))
1012 2 : ereport(ERROR,
1013 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1014 : errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1015 : errhint("Use DROP DATABASE to drop invalid databases."));
1016 :
1017 : /*
1018 : * Permission check: to copy a DB that's not marked datistemplate, you
1019 : * must be superuser or the owner thereof.
1020 : */
1021 788 : if (!src_istemplate)
1022 : {
1023 24 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1024 0 : ereport(ERROR,
1025 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1026 : errmsg("permission denied to copy database \"%s\"",
1027 : dbtemplate)));
1028 : }
1029 :
1030 : /* Validate the database creation strategy. */
1031 788 : if (strategyEl && strategyEl->arg)
1032 : {
1033 : char *strategy;
1034 :
1035 292 : strategy = defGetString(strategyEl);
1036 292 : if (pg_strcasecmp(strategy, "wal_log") == 0)
1037 22 : dbstrategy = CREATEDB_WAL_LOG;
1038 270 : else if (pg_strcasecmp(strategy, "file_copy") == 0)
1039 268 : dbstrategy = CREATEDB_FILE_COPY;
1040 : else
1041 2 : ereport(ERROR,
1042 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1043 : errmsg("invalid create database strategy \"%s\"", strategy),
1044 : errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1045 : }
1046 :
1047 : /* If encoding or locales are defaulted, use source's setting */
1048 786 : if (encoding < 0)
1049 682 : encoding = src_encoding;
1050 786 : if (dbcollate == NULL)
1051 668 : dbcollate = src_collate;
1052 786 : if (dbctype == NULL)
1053 668 : dbctype = src_ctype;
1054 786 : if (dblocprovider == '\0')
1055 672 : dblocprovider = src_locprovider;
1056 786 : if (dblocale == NULL && dblocprovider == src_locprovider)
1057 664 : dblocale = src_locale;
1058 786 : if (dbicurules == NULL)
1059 784 : dbicurules = src_icurules;
1060 :
1061 : /* Some encodings are client only */
1062 786 : if (!PG_VALID_BE_ENCODING(encoding))
1063 0 : ereport(ERROR,
1064 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1065 : errmsg("invalid server encoding %d", encoding)));
1066 :
1067 : /* Check that the chosen locales are valid, and get canonical spellings */
1068 786 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1069 : {
1070 2 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1071 0 : ereport(ERROR,
1072 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1073 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1074 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1075 2 : else if (dblocprovider == COLLPROVIDER_ICU)
1076 0 : ereport(ERROR,
1077 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1078 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1079 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1080 : else
1081 2 : ereport(ERROR,
1082 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1083 : errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
1084 : }
1085 784 : dbcollate = canonname;
1086 784 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
1087 : {
1088 2 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1089 0 : ereport(ERROR,
1090 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1091 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1092 : errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1093 2 : else if (dblocprovider == COLLPROVIDER_ICU)
1094 0 : ereport(ERROR,
1095 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1096 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1097 : errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1098 : else
1099 2 : ereport(ERROR,
1100 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1101 : errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
1102 : }
1103 :
1104 782 : dbctype = canonname;
1105 :
1106 782 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1107 :
1108 : /* validate provider-specific parameters */
1109 782 : if (dblocprovider != COLLPROVIDER_BUILTIN)
1110 : {
1111 720 : if (builtinlocaleEl)
1112 0 : ereport(ERROR,
1113 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1114 : errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1115 : }
1116 :
1117 782 : if (dblocprovider != COLLPROVIDER_ICU)
1118 : {
1119 752 : if (iculocaleEl)
1120 2 : ereport(ERROR,
1121 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1122 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1123 :
1124 750 : if (dbicurules)
1125 2 : ereport(ERROR,
1126 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1127 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1128 : }
1129 :
1130 : /* validate and canonicalize locale for the provider */
1131 778 : if (dblocprovider == COLLPROVIDER_BUILTIN)
1132 : {
1133 : /*
1134 : * This would happen if template0 uses the libc provider but the new
1135 : * database uses builtin.
1136 : */
1137 58 : if (!dblocale)
1138 2 : ereport(ERROR,
1139 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1140 : errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1141 :
1142 56 : dblocale = builtin_validate_locale(encoding, dblocale);
1143 : }
1144 720 : else if (dblocprovider == COLLPROVIDER_ICU)
1145 : {
1146 30 : if (!(is_encoding_supported_by_icu(encoding)))
1147 2 : ereport(ERROR,
1148 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1149 : errmsg("encoding \"%s\" is not supported with ICU provider",
1150 : pg_encoding_to_char(encoding))));
1151 :
1152 : /*
1153 : * This would happen if template0 uses the libc provider but the new
1154 : * database uses icu.
1155 : */
1156 28 : if (!dblocale)
1157 2 : ereport(ERROR,
1158 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1159 : errmsg("LOCALE or ICU_LOCALE must be specified")));
1160 :
1161 : /*
1162 : * During binary upgrade, or when the locale came from the template
1163 : * database, preserve locale string. Otherwise, canonicalize to a
1164 : * language tag.
1165 : */
1166 26 : if (!IsBinaryUpgrade && dblocale != src_locale)
1167 : {
1168 14 : char *langtag = icu_language_tag(dblocale,
1169 : icu_validation_level);
1170 :
1171 14 : if (langtag && strcmp(dblocale, langtag) != 0)
1172 : {
1173 6 : ereport(NOTICE,
1174 : (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1175 : langtag, dblocale)));
1176 :
1177 6 : dblocale = langtag;
1178 : }
1179 : }
1180 :
1181 26 : icu_validate_locale(dblocale);
1182 : }
1183 :
1184 : /* for libc, locale comes from datcollate and datctype */
1185 768 : if (dblocprovider == COLLPROVIDER_LIBC)
1186 690 : dblocale = NULL;
1187 :
1188 : /*
1189 : * Check that the new encoding and locale settings match the source
1190 : * database. We insist on this because we simply copy the source data ---
1191 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1192 : * according to the source locale would be wrong.
1193 : *
1194 : * However, we assume that template0 doesn't contain any non-ASCII data
1195 : * nor any indexes that depend on collation or ctype, so template0 can be
1196 : * used as template for creating a database with any encoding or locale.
1197 : */
1198 768 : if (strcmp(dbtemplate, "template0") != 0)
1199 : {
1200 464 : if (encoding != src_encoding)
1201 0 : ereport(ERROR,
1202 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1203 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1204 : pg_encoding_to_char(encoding),
1205 : pg_encoding_to_char(src_encoding)),
1206 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1207 :
1208 464 : if (strcmp(dbcollate, src_collate) != 0)
1209 2 : ereport(ERROR,
1210 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1211 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1212 : dbcollate, src_collate),
1213 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1214 :
1215 462 : if (strcmp(dbctype, src_ctype) != 0)
1216 0 : ereport(ERROR,
1217 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1218 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1219 : dbctype, src_ctype),
1220 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1221 :
1222 462 : if (dblocprovider != src_locprovider)
1223 0 : ereport(ERROR,
1224 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1225 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1226 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1227 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1228 :
1229 462 : if (dblocprovider == COLLPROVIDER_ICU)
1230 : {
1231 : char *val1;
1232 : char *val2;
1233 :
1234 : Assert(dblocale);
1235 : Assert(src_locale);
1236 12 : if (strcmp(dblocale, src_locale) != 0)
1237 0 : ereport(ERROR,
1238 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1239 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1240 : dblocale, src_locale),
1241 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1242 :
1243 12 : val1 = dbicurules;
1244 12 : if (!val1)
1245 12 : val1 = "";
1246 12 : val2 = src_icurules;
1247 12 : if (!val2)
1248 12 : val2 = "";
1249 12 : if (strcmp(val1, val2) != 0)
1250 0 : ereport(ERROR,
1251 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1252 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1253 : val1, val2),
1254 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1255 : }
1256 : }
1257 :
1258 : /*
1259 : * If we got a collation version for the template database, check that it
1260 : * matches the actual OS collation version. Otherwise error; the user
1261 : * needs to fix the template database first. Don't complain if a
1262 : * collation version was specified explicitly as a statement option; that
1263 : * is used by pg_upgrade to reproduce the old state exactly.
1264 : *
1265 : * (If the template database has no collation version, then either the
1266 : * platform/provider does not support collation versioning, or it's
1267 : * template0, for which we stipulate that it does not contain
1268 : * collation-using objects.)
1269 : */
1270 766 : if (src_collversion && !collversionEl)
1271 : {
1272 : char *actual_versionstr;
1273 : const char *locale;
1274 :
1275 304 : if (dblocprovider == COLLPROVIDER_LIBC)
1276 282 : locale = dbcollate;
1277 : else
1278 22 : locale = dblocale;
1279 :
1280 304 : actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1281 304 : if (!actual_versionstr)
1282 0 : ereport(ERROR,
1283 : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1284 : dbtemplate)));
1285 :
1286 304 : if (strcmp(actual_versionstr, src_collversion) != 0)
1287 0 : ereport(ERROR,
1288 : (errmsg("template database \"%s\" has a collation version mismatch",
1289 : dbtemplate),
1290 : errdetail("The template database was created using collation version %s, "
1291 : "but the operating system provides version %s.",
1292 : src_collversion, actual_versionstr),
1293 : errhint("Rebuild all objects in the template database that use the default collation and run "
1294 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1295 : "or build PostgreSQL with the right library version.",
1296 : quote_identifier(dbtemplate))));
1297 : }
1298 :
1299 766 : if (dbcollversion == NULL)
1300 710 : dbcollversion = src_collversion;
1301 :
1302 : /*
1303 : * Normally, we copy the collation version from the template database.
1304 : * This last resort only applies if the template database does not have a
1305 : * collation version, which is normally only the case for template0.
1306 : */
1307 766 : if (dbcollversion == NULL)
1308 : {
1309 : const char *locale;
1310 :
1311 406 : if (dblocprovider == COLLPROVIDER_LIBC)
1312 364 : locale = dbcollate;
1313 : else
1314 42 : locale = dblocale;
1315 :
1316 406 : dbcollversion = get_collation_actual_version(dblocprovider, locale);
1317 : }
1318 :
1319 : /* Resolve default tablespace for new database */
1320 766 : if (tablespacenameEl && tablespacenameEl->arg)
1321 34 : {
1322 : char *tablespacename;
1323 : AclResult aclresult;
1324 :
1325 34 : tablespacename = defGetString(tablespacenameEl);
1326 34 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1327 : /* check permissions */
1328 34 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1329 : ACL_CREATE);
1330 34 : if (aclresult != ACLCHECK_OK)
1331 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1332 : tablespacename);
1333 :
1334 : /* pg_global must never be the default tablespace */
1335 34 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
1336 0 : ereport(ERROR,
1337 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1338 : errmsg("pg_global cannot be used as default tablespace")));
1339 :
1340 : /*
1341 : * If we are trying to change the default tablespace of the template,
1342 : * we require that the template not have any files in the new default
1343 : * tablespace. This is necessary because otherwise the copied
1344 : * database would contain pg_class rows that refer to its default
1345 : * tablespace both explicitly (by OID) and implicitly (as zero), which
1346 : * would cause problems. For example another CREATE DATABASE using
1347 : * the copied database as template, and trying to change its default
1348 : * tablespace again, would yield outright incorrect results (it would
1349 : * improperly move tables to the new default tablespace that should
1350 : * stay in the same tablespace).
1351 : */
1352 34 : if (dst_deftablespace != src_deftablespace)
1353 : {
1354 : char *srcpath;
1355 : struct stat st;
1356 :
1357 34 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1358 :
1359 34 : if (stat(srcpath, &st) == 0 &&
1360 0 : S_ISDIR(st.st_mode) &&
1361 0 : !directory_is_empty(srcpath))
1362 0 : ereport(ERROR,
1363 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1364 : errmsg("cannot assign new default tablespace \"%s\"",
1365 : tablespacename),
1366 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1367 : dbtemplate)));
1368 34 : pfree(srcpath);
1369 : }
1370 : }
1371 : else
1372 : {
1373 : /* Use template database's default tablespace */
1374 732 : dst_deftablespace = src_deftablespace;
1375 : /* Note there is no additional permission check in this path */
1376 : }
1377 :
1378 : /*
1379 : * If built with appropriate switch, whine when regression-testing
1380 : * conventions for database names are violated. But don't complain during
1381 : * initdb.
1382 : */
1383 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1384 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1385 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1386 : #endif
1387 :
1388 : /*
1389 : * Check for db name conflict. This is just to give a more friendly error
1390 : * message than "unique index violation". There's a race condition but
1391 : * we're willing to accept the less friendly message in that case.
1392 : */
1393 766 : if (OidIsValid(get_database_oid(dbname, true)))
1394 2 : ereport(ERROR,
1395 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1396 : errmsg("database \"%s\" already exists", dbname)));
1397 :
1398 : /*
1399 : * The source DB can't have any active backends, except this one
1400 : * (exception is to allow CREATE DB while connected to template1).
1401 : * Otherwise we might copy inconsistent data.
1402 : *
1403 : * This should be last among the basic error checks, because it involves
1404 : * potential waiting; we may as well throw an error first if we're gonna
1405 : * throw one.
1406 : */
1407 764 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
1408 0 : ereport(ERROR,
1409 : (errcode(ERRCODE_OBJECT_IN_USE),
1410 : errmsg("source database \"%s\" is being accessed by other users",
1411 : dbtemplate),
1412 : errdetail_busy_db(notherbackends, npreparedxacts)));
1413 :
1414 : /*
1415 : * Select an OID for the new database, checking that it doesn't have a
1416 : * filename conflict with anything already existing in the tablespace
1417 : * directories.
1418 : */
1419 764 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1420 :
1421 : /*
1422 : * If database OID is configured, check if the OID is already in use or
1423 : * data directory already exists.
1424 : */
1425 764 : if (OidIsValid(dboid))
1426 : {
1427 258 : char *existing_dbname = get_database_name(dboid);
1428 :
1429 258 : if (existing_dbname != NULL)
1430 0 : ereport(ERROR,
1431 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1432 : errmsg("database OID %u is already in use by database \"%s\"",
1433 : dboid, existing_dbname));
1434 :
1435 258 : if (check_db_file_conflict(dboid))
1436 0 : ereport(ERROR,
1437 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1438 : errmsg("data directory with the specified OID %u already exists", dboid));
1439 : }
1440 : else
1441 : {
1442 : /* Select an OID for the new database if is not explicitly configured. */
1443 : do
1444 : {
1445 506 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1446 : Anum_pg_database_oid);
1447 506 : } while (check_db_file_conflict(dboid));
1448 : }
1449 :
1450 : /*
1451 : * Insert a new tuple into pg_database. This establishes our ownership of
1452 : * the new database name (anyone else trying to insert the same name will
1453 : * block on the unique index, and fail after we commit).
1454 : */
1455 :
1456 : Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1457 : (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1458 :
1459 : /* Form tuple */
1460 764 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1461 764 : new_record[Anum_pg_database_datname - 1] =
1462 764 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1463 764 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1464 764 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1465 764 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1466 764 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1467 764 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1468 764 : new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1469 764 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1470 764 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1471 764 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1472 764 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1473 764 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1474 764 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1475 764 : if (dblocale)
1476 76 : new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1477 : else
1478 688 : new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1479 764 : if (dbicurules)
1480 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1481 : else
1482 764 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1483 764 : if (dbcollversion)
1484 648 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1485 : else
1486 116 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1487 :
1488 : /*
1489 : * We deliberately set datacl to default (NULL), rather than copying it
1490 : * from the template database. Copying it would be a bad idea when the
1491 : * owner is not the same as the template's owner.
1492 : */
1493 764 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1494 :
1495 764 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1496 : new_record, new_record_nulls);
1497 :
1498 764 : CatalogTupleInsert(pg_database_rel, tuple);
1499 :
1500 : /*
1501 : * Now generate additional catalog entries associated with the new DB
1502 : */
1503 :
1504 : /* Register owner dependency */
1505 764 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1506 :
1507 : /* Create pg_shdepend entries for objects within database */
1508 764 : copyTemplateDependencies(src_dboid, dboid);
1509 :
1510 : /* Post creation hook for new database */
1511 764 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1512 :
1513 : /*
1514 : * If we're going to be reading data for the to-be-created database into
1515 : * shared_buffers, take a lock on it. Nobody should know that this
1516 : * database exists yet, but it's good to maintain the invariant that an
1517 : * AccessExclusiveLock on the database is sufficient to drop all of its
1518 : * buffers without worrying about more being read later.
1519 : *
1520 : * Note that we need to do this before entering the
1521 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1522 : * expects this lock to be held already.
1523 : */
1524 764 : if (dbstrategy == CREATEDB_WAL_LOG)
1525 496 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1526 :
1527 : /*
1528 : * Once we start copying subdirectories, we need to be able to clean 'em
1529 : * up if we fail. Use an ENSURE block to make sure this happens. (This
1530 : * is not a 100% solution, because of the possibility of failure during
1531 : * transaction commit after we leave this routine, but it should handle
1532 : * most scenarios.)
1533 : */
1534 764 : fparms.src_dboid = src_dboid;
1535 764 : fparms.dest_dboid = dboid;
1536 764 : fparms.strategy = dbstrategy;
1537 :
1538 764 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1539 : PointerGetDatum(&fparms));
1540 : {
1541 : /*
1542 : * If the user has asked to create a database with WAL_LOG strategy
1543 : * then call CreateDatabaseUsingWalLog, which will copy the database
1544 : * at the block level and it will WAL log each copied block.
1545 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1546 : * database file by file.
1547 : */
1548 764 : if (dbstrategy == CREATEDB_WAL_LOG)
1549 496 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1550 : dst_deftablespace);
1551 : else
1552 268 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1553 : dst_deftablespace);
1554 :
1555 : /*
1556 : * Close pg_database, but keep lock till commit.
1557 : */
1558 760 : table_close(pg_database_rel, NoLock);
1559 :
1560 : /*
1561 : * Force synchronous commit, thus minimizing the window between
1562 : * creation of the database files and committal of the transaction. If
1563 : * we crash before committing, we'll have a DB that's taking up disk
1564 : * space but is not in pg_database, which is not good.
1565 : */
1566 760 : ForceSyncCommit();
1567 : }
1568 764 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1569 : PointerGetDatum(&fparms));
1570 :
1571 760 : return dboid;
1572 : }
1573 :
1574 : /*
1575 : * Check whether chosen encoding matches chosen locale settings. This
1576 : * restriction is necessary because libc's locale-specific code usually
1577 : * fails when presented with data in an encoding it's not expecting. We
1578 : * allow mismatch in four cases:
1579 : *
1580 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1581 : * which works with any encoding.
1582 : *
1583 : * 2. locale encoding = -1, which means that we couldn't determine the
1584 : * locale's encoding and have to trust the user to get it right.
1585 : *
1586 : * 3. selected encoding is UTF8 and platform is win32. This is because
1587 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1588 : * converted to UTF16 before being used.
1589 : *
1590 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1591 : * is risky but we have historically allowed it --- notably, the
1592 : * regression tests require it.
1593 : *
1594 : * Note: if you change this policy, fix initdb to match.
1595 : */
1596 : void
1597 810 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1598 : {
1599 810 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1600 810 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1601 :
1602 818 : if (!(ctype_encoding == encoding ||
1603 8 : ctype_encoding == PG_SQL_ASCII ||
1604 : ctype_encoding == -1 ||
1605 : #ifdef WIN32
1606 : encoding == PG_UTF8 ||
1607 : #endif
1608 8 : (encoding == PG_SQL_ASCII && superuser())))
1609 0 : ereport(ERROR,
1610 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1611 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1612 : pg_encoding_to_char(encoding),
1613 : ctype),
1614 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1615 : pg_encoding_to_char(ctype_encoding))));
1616 :
1617 818 : if (!(collate_encoding == encoding ||
1618 8 : collate_encoding == PG_SQL_ASCII ||
1619 : collate_encoding == -1 ||
1620 : #ifdef WIN32
1621 : encoding == PG_UTF8 ||
1622 : #endif
1623 8 : (encoding == PG_SQL_ASCII && superuser())))
1624 0 : ereport(ERROR,
1625 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1626 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1627 : pg_encoding_to_char(encoding),
1628 : collate),
1629 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1630 : pg_encoding_to_char(collate_encoding))));
1631 810 : }
1632 :
1633 : /* Error cleanup callback for createdb */
1634 : static void
1635 4 : createdb_failure_callback(int code, Datum arg)
1636 : {
1637 4 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1638 :
1639 : /*
1640 : * If we were copying database at block levels then drop pages for the
1641 : * destination database that are in the shared buffer cache. And tell
1642 : * checkpointer to forget any pending fsync and unlink requests for files
1643 : * in the database. The reasoning behind doing this is same as explained
1644 : * in dropdb function. But unlike dropdb we don't need to call
1645 : * pgstat_drop_database because this database is still not created so
1646 : * there should not be any stat for this.
1647 : */
1648 4 : if (fparms->strategy == CREATEDB_WAL_LOG)
1649 : {
1650 4 : DropDatabaseBuffers(fparms->dest_dboid);
1651 4 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1652 :
1653 : /* Release lock on the target database. */
1654 4 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1655 : AccessShareLock);
1656 : }
1657 :
1658 : /*
1659 : * Release lock on source database before doing recursive remove. This is
1660 : * not essential but it seems desirable to release the lock as soon as
1661 : * possible.
1662 : */
1663 4 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1664 :
1665 : /* Throw away any successfully copied subdirectories */
1666 4 : remove_dbtablespaces(fparms->dest_dboid);
1667 4 : }
1668 :
1669 :
1670 : /*
1671 : * DROP DATABASE
1672 : */
1673 : void
1674 124 : dropdb(const char *dbname, bool missing_ok, bool force)
1675 : {
1676 : Oid db_id;
1677 : bool db_istemplate;
1678 : Relation pgdbrel;
1679 : HeapTuple tup;
1680 : ScanKeyData scankey;
1681 : void *inplace_state;
1682 : Form_pg_database datform;
1683 : int notherbackends;
1684 : int npreparedxacts;
1685 : int nslots,
1686 : nslots_active;
1687 : int nsubscriptions;
1688 :
1689 : /*
1690 : * Look up the target database's OID, and get exclusive lock on it. We
1691 : * need this to ensure that no new backend starts up in the target
1692 : * database while we are deleting it (see postinit.c), and that no one is
1693 : * using it as a CREATE DATABASE template or trying to delete it for
1694 : * themselves.
1695 : */
1696 124 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1697 :
1698 124 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1699 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1700 : {
1701 32 : if (!missing_ok)
1702 : {
1703 16 : ereport(ERROR,
1704 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1705 : errmsg("database \"%s\" does not exist", dbname)));
1706 : }
1707 : else
1708 : {
1709 : /* Close pg_database, release the lock, since we changed nothing */
1710 16 : table_close(pgdbrel, RowExclusiveLock);
1711 16 : ereport(NOTICE,
1712 : (errmsg("database \"%s\" does not exist, skipping",
1713 : dbname)));
1714 16 : return;
1715 : }
1716 : }
1717 :
1718 : /*
1719 : * Permission checks
1720 : */
1721 92 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1722 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1723 : dbname);
1724 :
1725 : /* DROP hook for the database being removed */
1726 92 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1727 :
1728 : /*
1729 : * Disallow dropping a DB that is marked istemplate. This is just to
1730 : * prevent people from accidentally dropping template0 or template1; they
1731 : * can do so if they're really determined ...
1732 : */
1733 92 : if (db_istemplate)
1734 0 : ereport(ERROR,
1735 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1736 : errmsg("cannot drop a template database")));
1737 :
1738 : /* Obviously can't drop my own database */
1739 92 : if (db_id == MyDatabaseId)
1740 0 : ereport(ERROR,
1741 : (errcode(ERRCODE_OBJECT_IN_USE),
1742 : errmsg("cannot drop the currently open database")));
1743 :
1744 : /*
1745 : * Check whether there are active logical slots that refer to the
1746 : * to-be-dropped database. The database lock we are holding prevents the
1747 : * creation of new slots using the database or existing slots becoming
1748 : * active.
1749 : */
1750 92 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1751 92 : if (nslots_active)
1752 : {
1753 2 : ereport(ERROR,
1754 : (errcode(ERRCODE_OBJECT_IN_USE),
1755 : errmsg("database \"%s\" is used by an active logical replication slot",
1756 : dbname),
1757 : errdetail_plural("There is %d active slot.",
1758 : "There are %d active slots.",
1759 : nslots_active, nslots_active)));
1760 : }
1761 :
1762 : /*
1763 : * Check if there are subscriptions defined in the target database.
1764 : *
1765 : * We can't drop them automatically because they might be holding
1766 : * resources in other databases/instances.
1767 : */
1768 90 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1769 0 : ereport(ERROR,
1770 : (errcode(ERRCODE_OBJECT_IN_USE),
1771 : errmsg("database \"%s\" is being used by logical replication subscription",
1772 : dbname),
1773 : errdetail_plural("There is %d subscription.",
1774 : "There are %d subscriptions.",
1775 : nsubscriptions, nsubscriptions)));
1776 :
1777 :
1778 : /*
1779 : * Attempt to terminate all existing connections to the target database if
1780 : * the user has requested to do so.
1781 : */
1782 90 : if (force)
1783 2 : TerminateOtherDBBackends(db_id);
1784 :
1785 : /*
1786 : * Check for other backends in the target database. (Because we hold the
1787 : * database lock, no new ones can start after this.)
1788 : *
1789 : * As in CREATE DATABASE, check this after other error conditions.
1790 : */
1791 90 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1792 0 : ereport(ERROR,
1793 : (errcode(ERRCODE_OBJECT_IN_USE),
1794 : errmsg("database \"%s\" is being accessed by other users",
1795 : dbname),
1796 : errdetail_busy_db(notherbackends, npreparedxacts)));
1797 :
1798 : /*
1799 : * Delete any comments or security labels associated with the database.
1800 : */
1801 90 : DeleteSharedComments(db_id, DatabaseRelationId);
1802 90 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1803 :
1804 : /*
1805 : * Remove settings associated with this database
1806 : */
1807 90 : DropSetting(db_id, InvalidOid);
1808 :
1809 : /*
1810 : * Remove shared dependency references for the database.
1811 : */
1812 90 : dropDatabaseDependencies(db_id);
1813 :
1814 : /*
1815 : * Tell the cumulative stats system to forget it immediately, too.
1816 : */
1817 90 : pgstat_drop_database(db_id);
1818 :
1819 : /*
1820 : * Except for the deletion of the catalog row, subsequent actions are not
1821 : * transactional (consider DropDatabaseBuffers() discarding modified
1822 : * buffers). But we might crash or get interrupted below. To prevent
1823 : * accesses to a database with invalid contents, mark the database as
1824 : * invalid using an in-place update.
1825 : *
1826 : * We need to flush the WAL before continuing, to guarantee the
1827 : * modification is durable before performing irreversible filesystem
1828 : * operations.
1829 : */
1830 90 : ScanKeyInit(&scankey,
1831 : Anum_pg_database_datname,
1832 : BTEqualStrategyNumber, F_NAMEEQ,
1833 : CStringGetDatum(dbname));
1834 90 : systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1835 : NULL, 1, &scankey, &tup, &inplace_state);
1836 90 : if (!HeapTupleIsValid(tup))
1837 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1838 90 : datform = (Form_pg_database) GETSTRUCT(tup);
1839 90 : datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1840 90 : systable_inplace_update_finish(inplace_state, tup);
1841 90 : XLogFlush(XactLastRecEnd);
1842 :
1843 : /*
1844 : * Also delete the tuple - transactionally. If this transaction commits,
1845 : * the row will be gone, but if we fail, dropdb() can be invoked again.
1846 : */
1847 90 : CatalogTupleDelete(pgdbrel, &tup->t_self);
1848 90 : heap_freetuple(tup);
1849 :
1850 : /*
1851 : * Drop db-specific replication slots.
1852 : */
1853 90 : ReplicationSlotsDropDBSlots(db_id);
1854 :
1855 : /*
1856 : * Drop pages for this database that are in the shared buffer cache. This
1857 : * is important to ensure that no remaining backend tries to write out a
1858 : * dirty buffer to the dead database later...
1859 : */
1860 90 : DropDatabaseBuffers(db_id);
1861 :
1862 : /*
1863 : * Tell checkpointer to forget any pending fsync and unlink requests for
1864 : * files in the database; else the fsyncs will fail at next checkpoint, or
1865 : * worse, it will delete files that belong to a newly created database
1866 : * with the same OID.
1867 : */
1868 90 : ForgetDatabaseSyncRequests(db_id);
1869 :
1870 : /*
1871 : * Force a checkpoint to make sure the checkpointer has received the
1872 : * message sent by ForgetDatabaseSyncRequests.
1873 : */
1874 90 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1875 :
1876 : /* Close all smgr fds in all backends. */
1877 90 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1878 :
1879 : /*
1880 : * Remove all tablespace subdirs belonging to the database.
1881 : */
1882 90 : remove_dbtablespaces(db_id);
1883 :
1884 : /*
1885 : * Close pg_database, but keep lock till commit.
1886 : */
1887 90 : table_close(pgdbrel, NoLock);
1888 :
1889 : /*
1890 : * Force synchronous commit, thus minimizing the window between removal of
1891 : * the database files and committal of the transaction. If we crash before
1892 : * committing, we'll have a DB that's gone on disk but still there
1893 : * according to pg_database, which is not good.
1894 : */
1895 90 : ForceSyncCommit();
1896 : }
1897 :
1898 :
1899 : /*
1900 : * Rename database
1901 : */
1902 : ObjectAddress
1903 12 : RenameDatabase(const char *oldname, const char *newname)
1904 : {
1905 : Oid db_id;
1906 : HeapTuple newtup;
1907 : ItemPointerData otid;
1908 : Relation rel;
1909 : int notherbackends;
1910 : int npreparedxacts;
1911 : ObjectAddress address;
1912 :
1913 : /*
1914 : * Look up the target database's OID, and get exclusive lock on it. We
1915 : * need this for the same reasons as DROP DATABASE.
1916 : */
1917 12 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1918 :
1919 12 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1920 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1921 0 : ereport(ERROR,
1922 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1923 : errmsg("database \"%s\" does not exist", oldname)));
1924 :
1925 : /* must be owner */
1926 12 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1927 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1928 : oldname);
1929 :
1930 : /* must have createdb rights */
1931 12 : if (!have_createdb_privilege())
1932 0 : ereport(ERROR,
1933 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1934 : errmsg("permission denied to rename database")));
1935 :
1936 : /*
1937 : * If built with appropriate switch, whine when regression-testing
1938 : * conventions for database names are violated.
1939 : */
1940 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1941 : if (strstr(newname, "regression") == NULL)
1942 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1943 : #endif
1944 :
1945 : /*
1946 : * Make sure the new name doesn't exist. See notes for same error in
1947 : * CREATE DATABASE.
1948 : */
1949 12 : if (OidIsValid(get_database_oid(newname, true)))
1950 0 : ereport(ERROR,
1951 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1952 : errmsg("database \"%s\" already exists", newname)));
1953 :
1954 : /*
1955 : * XXX Client applications probably store the current database somewhere,
1956 : * so renaming it could cause confusion. On the other hand, there may not
1957 : * be an actual problem besides a little confusion, so think about this
1958 : * and decide.
1959 : */
1960 12 : if (db_id == MyDatabaseId)
1961 0 : ereport(ERROR,
1962 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1963 : errmsg("current database cannot be renamed")));
1964 :
1965 : /*
1966 : * Make sure the database does not have active sessions. This is the same
1967 : * concern as above, but applied to other sessions.
1968 : *
1969 : * As in CREATE DATABASE, check this after other error conditions.
1970 : */
1971 12 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1972 0 : ereport(ERROR,
1973 : (errcode(ERRCODE_OBJECT_IN_USE),
1974 : errmsg("database \"%s\" is being accessed by other users",
1975 : oldname),
1976 : errdetail_busy_db(notherbackends, npreparedxacts)));
1977 :
1978 : /* rename */
1979 12 : newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1980 12 : if (!HeapTupleIsValid(newtup))
1981 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1982 12 : otid = newtup->t_self;
1983 12 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1984 12 : CatalogTupleUpdate(rel, &otid, newtup);
1985 12 : UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1986 :
1987 12 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1988 :
1989 12 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1990 :
1991 : /*
1992 : * Close pg_database, but keep lock till commit.
1993 : */
1994 12 : table_close(rel, NoLock);
1995 :
1996 12 : return address;
1997 : }
1998 :
1999 :
2000 : /*
2001 : * ALTER DATABASE SET TABLESPACE
2002 : */
2003 : static void
2004 22 : movedb(const char *dbname, const char *tblspcname)
2005 : {
2006 : Oid db_id;
2007 : Relation pgdbrel;
2008 : int notherbackends;
2009 : int npreparedxacts;
2010 : HeapTuple oldtuple,
2011 : newtuple;
2012 : Oid src_tblspcoid,
2013 : dst_tblspcoid;
2014 : ScanKeyData scankey;
2015 : SysScanDesc sysscan;
2016 : AclResult aclresult;
2017 : char *src_dbpath;
2018 : char *dst_dbpath;
2019 : DIR *dstdir;
2020 : struct dirent *xlde;
2021 : movedb_failure_params fparms;
2022 :
2023 : /*
2024 : * Look up the target database's OID, and get exclusive lock on it. We
2025 : * need this to ensure that no new backend starts up in the database while
2026 : * we are moving it, and that no one is using it as a CREATE DATABASE
2027 : * template or trying to delete it.
2028 : */
2029 22 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2030 :
2031 22 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2032 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2033 0 : ereport(ERROR,
2034 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2035 : errmsg("database \"%s\" does not exist", dbname)));
2036 :
2037 : /*
2038 : * We actually need a session lock, so that the lock will persist across
2039 : * the commit/restart below. (We could almost get away with letting the
2040 : * lock be released at commit, except that someone could try to move
2041 : * relations of the DB back into the old directory while we rmtree() it.)
2042 : */
2043 22 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2044 : AccessExclusiveLock);
2045 :
2046 : /*
2047 : * Permission checks
2048 : */
2049 22 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2050 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2051 : dbname);
2052 :
2053 : /*
2054 : * Obviously can't move the tables of my own database
2055 : */
2056 22 : if (db_id == MyDatabaseId)
2057 0 : ereport(ERROR,
2058 : (errcode(ERRCODE_OBJECT_IN_USE),
2059 : errmsg("cannot change the tablespace of the currently open database")));
2060 :
2061 : /*
2062 : * Get tablespace's oid
2063 : */
2064 22 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2065 :
2066 : /*
2067 : * Permission checks
2068 : */
2069 22 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2070 : ACL_CREATE);
2071 22 : if (aclresult != ACLCHECK_OK)
2072 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
2073 : tblspcname);
2074 :
2075 : /*
2076 : * pg_global must never be the default tablespace
2077 : */
2078 22 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2079 0 : ereport(ERROR,
2080 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2081 : errmsg("pg_global cannot be used as default tablespace")));
2082 :
2083 : /*
2084 : * No-op if same tablespace
2085 : */
2086 22 : if (src_tblspcoid == dst_tblspcoid)
2087 : {
2088 0 : table_close(pgdbrel, NoLock);
2089 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2090 : AccessExclusiveLock);
2091 0 : return;
2092 : }
2093 :
2094 : /*
2095 : * Check for other backends in the target database. (Because we hold the
2096 : * database lock, no new ones can start after this.)
2097 : *
2098 : * As in CREATE DATABASE, check this after other error conditions.
2099 : */
2100 22 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
2101 0 : ereport(ERROR,
2102 : (errcode(ERRCODE_OBJECT_IN_USE),
2103 : errmsg("database \"%s\" is being accessed by other users",
2104 : dbname),
2105 : errdetail_busy_db(notherbackends, npreparedxacts)));
2106 :
2107 : /*
2108 : * Get old and new database paths
2109 : */
2110 22 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2111 22 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2112 :
2113 : /*
2114 : * Force a checkpoint before proceeding. This will force all dirty
2115 : * buffers, including those of unlogged tables, out to disk, to ensure
2116 : * source database is up-to-date on disk for the copy.
2117 : * FlushDatabaseBuffers() would suffice for that, but we also want to
2118 : * process any pending unlink requests. Otherwise, the check for existing
2119 : * files in the target directory might fail unnecessarily, not to mention
2120 : * that the copy might fail due to source files getting deleted under it.
2121 : * On Windows, this also ensures that background procs don't hold any open
2122 : * files, which would cause rmdir() to fail.
2123 : */
2124 22 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2125 : | CHECKPOINT_FLUSH_UNLOGGED);
2126 :
2127 : /* Close all smgr fds in all backends. */
2128 22 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2129 :
2130 : /*
2131 : * Now drop all buffers holding data of the target database; they should
2132 : * no longer be dirty so DropDatabaseBuffers is safe.
2133 : *
2134 : * It might seem that we could just let these buffers age out of shared
2135 : * buffers naturally, since they should not get referenced anymore. The
2136 : * problem with that is that if the user later moves the database back to
2137 : * its original tablespace, any still-surviving buffers would appear to
2138 : * contain valid data again --- but they'd be missing any changes made in
2139 : * the database while it was in the new tablespace. In any case, freeing
2140 : * buffers that should never be used again seems worth the cycles.
2141 : *
2142 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2143 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2144 : */
2145 22 : DropDatabaseBuffers(db_id);
2146 :
2147 : /*
2148 : * Check for existence of files in the target directory, i.e., objects of
2149 : * this database that are already in the target tablespace. We can't
2150 : * allow the move in such a case, because we would need to change those
2151 : * relations' pg_class.reltablespace entries to zero, and we don't have
2152 : * access to the DB's pg_class to do so.
2153 : */
2154 22 : dstdir = AllocateDir(dst_dbpath);
2155 22 : if (dstdir != NULL)
2156 : {
2157 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2158 : {
2159 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2160 0 : strcmp(xlde->d_name, "..") == 0)
2161 0 : continue;
2162 :
2163 0 : ereport(ERROR,
2164 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2165 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2166 : dbname, tblspcname),
2167 : errhint("You must move them back to the database's default tablespace before using this command.")));
2168 : }
2169 :
2170 0 : FreeDir(dstdir);
2171 :
2172 : /*
2173 : * The directory exists but is empty. We must remove it before using
2174 : * the copydir function.
2175 : */
2176 0 : if (rmdir(dst_dbpath) != 0)
2177 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2178 : dst_dbpath);
2179 : }
2180 :
2181 : /*
2182 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2183 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2184 : * of the possibility of failure during transaction commit, but it should
2185 : * handle most scenarios.
2186 : */
2187 22 : fparms.dest_dboid = db_id;
2188 22 : fparms.dest_tsoid = dst_tblspcoid;
2189 22 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2190 : PointerGetDatum(&fparms));
2191 : {
2192 22 : Datum new_record[Natts_pg_database] = {0};
2193 22 : bool new_record_nulls[Natts_pg_database] = {0};
2194 22 : bool new_record_repl[Natts_pg_database] = {0};
2195 :
2196 : /*
2197 : * Copy files from the old tablespace to the new one
2198 : */
2199 22 : copydir(src_dbpath, dst_dbpath, false);
2200 :
2201 : /*
2202 : * Record the filesystem change in XLOG
2203 : */
2204 : {
2205 : xl_dbase_create_file_copy_rec xlrec;
2206 :
2207 22 : xlrec.db_id = db_id;
2208 22 : xlrec.tablespace_id = dst_tblspcoid;
2209 22 : xlrec.src_db_id = db_id;
2210 22 : xlrec.src_tablespace_id = src_tblspcoid;
2211 :
2212 22 : XLogBeginInsert();
2213 22 : XLogRegisterData(&xlrec,
2214 : sizeof(xl_dbase_create_file_copy_rec));
2215 :
2216 22 : (void) XLogInsert(RM_DBASE_ID,
2217 : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2218 : }
2219 :
2220 : /*
2221 : * Update the database's pg_database tuple
2222 : */
2223 22 : ScanKeyInit(&scankey,
2224 : Anum_pg_database_datname,
2225 : BTEqualStrategyNumber, F_NAMEEQ,
2226 : CStringGetDatum(dbname));
2227 22 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2228 : NULL, 1, &scankey);
2229 22 : oldtuple = systable_getnext(sysscan);
2230 22 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2231 0 : ereport(ERROR,
2232 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2233 : errmsg("database \"%s\" does not exist", dbname)));
2234 22 : LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2235 :
2236 22 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2237 22 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2238 :
2239 22 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2240 : new_record,
2241 : new_record_nulls, new_record_repl);
2242 22 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2243 22 : UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2244 :
2245 22 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2246 :
2247 22 : systable_endscan(sysscan);
2248 :
2249 : /*
2250 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2251 : * ensure that we don't have to replay a committed
2252 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2253 : * any unlogged operations done in the new DB tablespace before the
2254 : * next checkpoint.
2255 : */
2256 22 : RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2257 :
2258 : /*
2259 : * Force synchronous commit, thus minimizing the window between
2260 : * copying the database files and committal of the transaction. If we
2261 : * crash before committing, we'll leave an orphaned set of files on
2262 : * disk, which is not fatal but not good either.
2263 : */
2264 22 : ForceSyncCommit();
2265 :
2266 : /*
2267 : * Close pg_database, but keep lock till commit.
2268 : */
2269 22 : table_close(pgdbrel, NoLock);
2270 : }
2271 22 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2272 : PointerGetDatum(&fparms));
2273 :
2274 : /*
2275 : * Commit the transaction so that the pg_database update is committed. If
2276 : * we crash while removing files, the database won't be corrupt, we'll
2277 : * just leave some orphaned files in the old directory.
2278 : *
2279 : * (This is OK because we know we aren't inside a transaction block.)
2280 : *
2281 : * XXX would it be safe/better to do this inside the ensure block? Not
2282 : * convinced it's a good idea; consider elog just after the transaction
2283 : * really commits.
2284 : */
2285 22 : PopActiveSnapshot();
2286 22 : CommitTransactionCommand();
2287 :
2288 : /* Start new transaction for the remaining work; don't need a snapshot */
2289 22 : StartTransactionCommand();
2290 :
2291 : /*
2292 : * Remove files from the old tablespace
2293 : */
2294 22 : if (!rmtree(src_dbpath, true))
2295 0 : ereport(WARNING,
2296 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2297 : src_dbpath)));
2298 :
2299 : /*
2300 : * Record the filesystem change in XLOG
2301 : */
2302 : {
2303 : xl_dbase_drop_rec xlrec;
2304 :
2305 22 : xlrec.db_id = db_id;
2306 22 : xlrec.ntablespaces = 1;
2307 :
2308 22 : XLogBeginInsert();
2309 22 : XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2310 22 : XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2311 :
2312 22 : (void) XLogInsert(RM_DBASE_ID,
2313 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2314 : }
2315 :
2316 : /* Now it's safe to release the database lock */
2317 22 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2318 : AccessExclusiveLock);
2319 :
2320 22 : pfree(src_dbpath);
2321 22 : pfree(dst_dbpath);
2322 : }
2323 :
2324 : /* Error cleanup callback for movedb */
2325 : static void
2326 0 : movedb_failure_callback(int code, Datum arg)
2327 : {
2328 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2329 : char *dstpath;
2330 :
2331 : /* Get rid of anything we managed to copy to the target directory */
2332 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2333 :
2334 0 : (void) rmtree(dstpath, true);
2335 :
2336 0 : pfree(dstpath);
2337 0 : }
2338 :
2339 : /*
2340 : * Process options and call dropdb function.
2341 : */
2342 : void
2343 124 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2344 : {
2345 124 : bool force = false;
2346 : ListCell *lc;
2347 :
2348 150 : foreach(lc, stmt->options)
2349 : {
2350 26 : DefElem *opt = (DefElem *) lfirst(lc);
2351 :
2352 26 : if (strcmp(opt->defname, "force") == 0)
2353 26 : force = true;
2354 : else
2355 0 : ereport(ERROR,
2356 : (errcode(ERRCODE_SYNTAX_ERROR),
2357 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2358 : parser_errposition(pstate, opt->location)));
2359 : }
2360 :
2361 124 : dropdb(stmt->dbname, stmt->missing_ok, force);
2362 106 : }
2363 :
2364 : /*
2365 : * ALTER DATABASE name ...
2366 : */
2367 : Oid
2368 82 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2369 : {
2370 : Relation rel;
2371 : Oid dboid;
2372 : HeapTuple tuple,
2373 : newtuple;
2374 : Form_pg_database datform;
2375 : ScanKeyData scankey;
2376 : SysScanDesc scan;
2377 : ListCell *option;
2378 82 : bool dbistemplate = false;
2379 82 : bool dballowconnections = true;
2380 82 : int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2381 82 : DefElem *distemplate = NULL;
2382 82 : DefElem *dallowconnections = NULL;
2383 82 : DefElem *dconnlimit = NULL;
2384 82 : DefElem *dtablespace = NULL;
2385 82 : Datum new_record[Natts_pg_database] = {0};
2386 82 : bool new_record_nulls[Natts_pg_database] = {0};
2387 82 : bool new_record_repl[Natts_pg_database] = {0};
2388 :
2389 : /* Extract options from the statement node tree */
2390 164 : foreach(option, stmt->options)
2391 : {
2392 82 : DefElem *defel = (DefElem *) lfirst(option);
2393 :
2394 82 : if (strcmp(defel->defname, "is_template") == 0)
2395 : {
2396 20 : if (distemplate)
2397 0 : errorConflictingDefElem(defel, pstate);
2398 20 : distemplate = defel;
2399 : }
2400 62 : else if (strcmp(defel->defname, "allow_connections") == 0)
2401 : {
2402 32 : if (dallowconnections)
2403 0 : errorConflictingDefElem(defel, pstate);
2404 32 : dallowconnections = defel;
2405 : }
2406 30 : else if (strcmp(defel->defname, "connection_limit") == 0)
2407 : {
2408 8 : if (dconnlimit)
2409 0 : errorConflictingDefElem(defel, pstate);
2410 8 : dconnlimit = defel;
2411 : }
2412 22 : else if (strcmp(defel->defname, "tablespace") == 0)
2413 : {
2414 22 : if (dtablespace)
2415 0 : errorConflictingDefElem(defel, pstate);
2416 22 : dtablespace = defel;
2417 : }
2418 : else
2419 0 : ereport(ERROR,
2420 : (errcode(ERRCODE_SYNTAX_ERROR),
2421 : errmsg("option \"%s\" not recognized", defel->defname),
2422 : parser_errposition(pstate, defel->location)));
2423 : }
2424 :
2425 82 : if (dtablespace)
2426 : {
2427 : /*
2428 : * While the SET TABLESPACE syntax doesn't allow any other options,
2429 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2430 : * options from being specified in that case.
2431 : */
2432 22 : if (list_length(stmt->options) != 1)
2433 0 : ereport(ERROR,
2434 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2435 : errmsg("option \"%s\" cannot be specified with other options",
2436 : dtablespace->defname),
2437 : parser_errposition(pstate, dtablespace->location)));
2438 : /* this case isn't allowed within a transaction block */
2439 22 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2440 22 : movedb(stmt->dbname, defGetString(dtablespace));
2441 22 : return InvalidOid;
2442 : }
2443 :
2444 60 : if (distemplate && distemplate->arg)
2445 20 : dbistemplate = defGetBoolean(distemplate);
2446 60 : if (dallowconnections && dallowconnections->arg)
2447 32 : dballowconnections = defGetBoolean(dallowconnections);
2448 60 : if (dconnlimit && dconnlimit->arg)
2449 : {
2450 8 : dbconnlimit = defGetInt32(dconnlimit);
2451 8 : if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2452 0 : ereport(ERROR,
2453 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2454 : errmsg("invalid connection limit: %d", dbconnlimit)));
2455 : }
2456 :
2457 : /*
2458 : * Get the old tuple. We don't need a lock on the database per se,
2459 : * because we're not going to do anything that would mess up incoming
2460 : * connections.
2461 : */
2462 60 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2463 60 : ScanKeyInit(&scankey,
2464 : Anum_pg_database_datname,
2465 : BTEqualStrategyNumber, F_NAMEEQ,
2466 60 : CStringGetDatum(stmt->dbname));
2467 60 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2468 : NULL, 1, &scankey);
2469 60 : tuple = systable_getnext(scan);
2470 60 : if (!HeapTupleIsValid(tuple))
2471 0 : ereport(ERROR,
2472 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2473 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2474 60 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2475 :
2476 60 : datform = (Form_pg_database) GETSTRUCT(tuple);
2477 60 : dboid = datform->oid;
2478 :
2479 60 : if (database_is_invalid_form(datform))
2480 : {
2481 2 : ereport(FATAL,
2482 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2483 : errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2484 : errhint("Use DROP DATABASE to drop invalid databases."));
2485 : }
2486 :
2487 58 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2488 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2489 0 : stmt->dbname);
2490 :
2491 : /*
2492 : * In order to avoid getting locked out and having to go through
2493 : * standalone mode, we refuse to disallow connections to the database
2494 : * we're currently connected to. Lockout can still happen with concurrent
2495 : * sessions but the likeliness of that is not high enough to worry about.
2496 : */
2497 58 : if (!dballowconnections && dboid == MyDatabaseId)
2498 0 : ereport(ERROR,
2499 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2500 : errmsg("cannot disallow connections for current database")));
2501 :
2502 : /*
2503 : * Build an updated tuple, perusing the information just obtained
2504 : */
2505 58 : if (distemplate)
2506 : {
2507 20 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2508 20 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2509 : }
2510 58 : if (dallowconnections)
2511 : {
2512 32 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2513 32 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2514 : }
2515 58 : if (dconnlimit)
2516 : {
2517 6 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2518 6 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2519 : }
2520 :
2521 58 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2522 : new_record_nulls, new_record_repl);
2523 58 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2524 58 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2525 :
2526 58 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2527 :
2528 58 : systable_endscan(scan);
2529 :
2530 : /* Close pg_database, but keep lock till commit */
2531 58 : table_close(rel, NoLock);
2532 :
2533 58 : return dboid;
2534 : }
2535 :
2536 :
2537 : /*
2538 : * ALTER DATABASE name REFRESH COLLATION VERSION
2539 : */
2540 : ObjectAddress
2541 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2542 : {
2543 : Relation rel;
2544 : ScanKeyData scankey;
2545 : SysScanDesc scan;
2546 : Oid db_id;
2547 : HeapTuple tuple;
2548 : Form_pg_database datForm;
2549 : ObjectAddress address;
2550 : Datum datum;
2551 : bool isnull;
2552 : char *oldversion;
2553 : char *newversion;
2554 :
2555 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2556 6 : ScanKeyInit(&scankey,
2557 : Anum_pg_database_datname,
2558 : BTEqualStrategyNumber, F_NAMEEQ,
2559 6 : CStringGetDatum(stmt->dbname));
2560 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2561 : NULL, 1, &scankey);
2562 6 : tuple = systable_getnext(scan);
2563 6 : if (!HeapTupleIsValid(tuple))
2564 0 : ereport(ERROR,
2565 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2566 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2567 :
2568 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2569 6 : db_id = datForm->oid;
2570 :
2571 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2572 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2573 0 : stmt->dbname);
2574 6 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2575 :
2576 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2577 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
2578 :
2579 6 : if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2580 : {
2581 4 : datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2582 4 : if (isnull)
2583 0 : elog(ERROR, "unexpected null in pg_database");
2584 : }
2585 : else
2586 : {
2587 2 : datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2588 2 : if (isnull)
2589 0 : elog(ERROR, "unexpected null in pg_database");
2590 : }
2591 :
2592 6 : newversion = get_collation_actual_version(datForm->datlocprovider,
2593 6 : TextDatumGetCString(datum));
2594 :
2595 : /* cannot change from NULL to non-NULL or vice versa */
2596 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
2597 0 : elog(ERROR, "invalid collation version change");
2598 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2599 0 : {
2600 0 : bool nulls[Natts_pg_database] = {0};
2601 0 : bool replaces[Natts_pg_database] = {0};
2602 0 : Datum values[Natts_pg_database] = {0};
2603 : HeapTuple newtuple;
2604 :
2605 0 : ereport(NOTICE,
2606 : (errmsg("changing version from %s to %s",
2607 : oldversion, newversion)));
2608 :
2609 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2610 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2611 :
2612 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2613 : values, nulls, replaces);
2614 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2615 0 : heap_freetuple(newtuple);
2616 : }
2617 : else
2618 6 : ereport(NOTICE,
2619 : (errmsg("version has not changed")));
2620 6 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2621 :
2622 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2623 :
2624 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2625 :
2626 6 : systable_endscan(scan);
2627 :
2628 6 : table_close(rel, NoLock);
2629 :
2630 6 : return address;
2631 : }
2632 :
2633 :
2634 : /*
2635 : * ALTER DATABASE name SET ...
2636 : */
2637 : Oid
2638 1206 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2639 : {
2640 1206 : Oid datid = get_database_oid(stmt->dbname, false);
2641 :
2642 : /*
2643 : * Obtain a lock on the database and make sure it didn't go away in the
2644 : * meantime.
2645 : */
2646 1206 : shdepLockAndCheckObject(DatabaseRelationId, datid);
2647 :
2648 1206 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2649 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2650 0 : stmt->dbname);
2651 :
2652 1206 : AlterSetting(datid, InvalidOid, stmt->setstmt);
2653 :
2654 1202 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2655 :
2656 1202 : return datid;
2657 : }
2658 :
2659 :
2660 : /*
2661 : * ALTER DATABASE name OWNER TO newowner
2662 : */
2663 : ObjectAddress
2664 86 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2665 : {
2666 : Oid db_id;
2667 : HeapTuple tuple;
2668 : Relation rel;
2669 : ScanKeyData scankey;
2670 : SysScanDesc scan;
2671 : Form_pg_database datForm;
2672 : ObjectAddress address;
2673 :
2674 : /*
2675 : * Get the old tuple. We don't need a lock on the database per se,
2676 : * because we're not going to do anything that would mess up incoming
2677 : * connections.
2678 : */
2679 86 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
2680 86 : ScanKeyInit(&scankey,
2681 : Anum_pg_database_datname,
2682 : BTEqualStrategyNumber, F_NAMEEQ,
2683 : CStringGetDatum(dbname));
2684 86 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2685 : NULL, 1, &scankey);
2686 86 : tuple = systable_getnext(scan);
2687 86 : if (!HeapTupleIsValid(tuple))
2688 0 : ereport(ERROR,
2689 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2690 : errmsg("database \"%s\" does not exist", dbname)));
2691 :
2692 86 : datForm = (Form_pg_database) GETSTRUCT(tuple);
2693 86 : db_id = datForm->oid;
2694 :
2695 : /*
2696 : * If the new owner is the same as the existing owner, consider the
2697 : * command to have succeeded. This is to be consistent with other
2698 : * objects.
2699 : */
2700 86 : if (datForm->datdba != newOwnerId)
2701 : {
2702 : Datum repl_val[Natts_pg_database];
2703 30 : bool repl_null[Natts_pg_database] = {0};
2704 30 : bool repl_repl[Natts_pg_database] = {0};
2705 : Acl *newAcl;
2706 : Datum aclDatum;
2707 : bool isNull;
2708 : HeapTuple newtuple;
2709 :
2710 : /* Otherwise, must be owner of the existing object */
2711 30 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2712 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2713 : dbname);
2714 :
2715 : /* Must be able to become new owner */
2716 30 : check_can_set_role(GetUserId(), newOwnerId);
2717 :
2718 : /*
2719 : * must have createdb rights
2720 : *
2721 : * NOTE: This is different from other alter-owner checks in that the
2722 : * current user is checked for createdb privileges instead of the
2723 : * destination owner. This is consistent with the CREATE case for
2724 : * databases. Because superusers will always have this right, we need
2725 : * no special case for them.
2726 : */
2727 30 : if (!have_createdb_privilege())
2728 0 : ereport(ERROR,
2729 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2730 : errmsg("permission denied to change owner of database")));
2731 :
2732 30 : LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2733 :
2734 30 : repl_repl[Anum_pg_database_datdba - 1] = true;
2735 30 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2736 :
2737 : /*
2738 : * Determine the modified ACL for the new owner. This is only
2739 : * necessary when the ACL is non-null.
2740 : */
2741 30 : aclDatum = heap_getattr(tuple,
2742 : Anum_pg_database_datacl,
2743 : RelationGetDescr(rel),
2744 : &isNull);
2745 30 : if (!isNull)
2746 : {
2747 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
2748 : datForm->datdba, newOwnerId);
2749 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
2750 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2751 : }
2752 :
2753 30 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2754 30 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2755 30 : UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2756 :
2757 30 : heap_freetuple(newtuple);
2758 :
2759 : /* Update owner dependency reference */
2760 30 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2761 : }
2762 :
2763 86 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2764 :
2765 86 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2766 :
2767 86 : systable_endscan(scan);
2768 :
2769 : /* Close pg_database, but keep lock till commit */
2770 86 : table_close(rel, NoLock);
2771 :
2772 86 : return address;
2773 : }
2774 :
2775 :
2776 : Datum
2777 96 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2778 : {
2779 96 : Oid dbid = PG_GETARG_OID(0);
2780 : HeapTuple tp;
2781 : char datlocprovider;
2782 : Datum datum;
2783 : char *version;
2784 :
2785 96 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2786 96 : if (!HeapTupleIsValid(tp))
2787 0 : ereport(ERROR,
2788 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2789 : errmsg("database with OID %u does not exist", dbid)));
2790 :
2791 96 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2792 :
2793 96 : if (datlocprovider == COLLPROVIDER_LIBC)
2794 82 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2795 : else
2796 14 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2797 :
2798 96 : version = get_collation_actual_version(datlocprovider,
2799 96 : TextDatumGetCString(datum));
2800 :
2801 96 : ReleaseSysCache(tp);
2802 :
2803 96 : if (version)
2804 68 : PG_RETURN_TEXT_P(cstring_to_text(version));
2805 : else
2806 28 : PG_RETURN_NULL();
2807 : }
2808 :
2809 :
2810 : /*
2811 : * Helper functions
2812 : */
2813 :
2814 : /*
2815 : * Look up info about the database named "name". If the database exists,
2816 : * obtain the specified lock type on it, fill in any of the remaining
2817 : * parameters that aren't NULL, and return true. If no such database,
2818 : * return false.
2819 : */
2820 : static bool
2821 948 : get_db_info(const char *name, LOCKMODE lockmode,
2822 : Oid *dbIdP, Oid *ownerIdP,
2823 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2824 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2825 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2826 : char **dbIcurules,
2827 : char *dbLocProvider,
2828 : char **dbCollversion)
2829 : {
2830 948 : bool result = false;
2831 : Relation relation;
2832 :
2833 : Assert(name);
2834 :
2835 : /* Caller may wish to grab a better lock on pg_database beforehand... */
2836 948 : relation = table_open(DatabaseRelationId, AccessShareLock);
2837 :
2838 : /*
2839 : * Loop covers the rare case where the database is renamed before we can
2840 : * lock it. We try again just in case we can find a new one of the same
2841 : * name.
2842 : */
2843 : for (;;)
2844 0 : {
2845 : ScanKeyData scanKey;
2846 : SysScanDesc scan;
2847 : HeapTuple tuple;
2848 : Oid dbOid;
2849 :
2850 : /*
2851 : * there's no syscache for database-indexed-by-name, so must do it the
2852 : * hard way
2853 : */
2854 948 : ScanKeyInit(&scanKey,
2855 : Anum_pg_database_datname,
2856 : BTEqualStrategyNumber, F_NAMEEQ,
2857 : CStringGetDatum(name));
2858 :
2859 948 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2860 : NULL, 1, &scanKey);
2861 :
2862 948 : tuple = systable_getnext(scan);
2863 :
2864 948 : if (!HeapTupleIsValid(tuple))
2865 : {
2866 : /* definitely no database of that name */
2867 32 : systable_endscan(scan);
2868 32 : break;
2869 : }
2870 :
2871 916 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2872 :
2873 916 : systable_endscan(scan);
2874 :
2875 : /*
2876 : * Now that we have a database OID, we can try to lock the DB.
2877 : */
2878 916 : if (lockmode != NoLock)
2879 916 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2880 :
2881 : /*
2882 : * And now, re-fetch the tuple by OID. If it's still there and still
2883 : * the same name, we win; else, drop the lock and loop back to try
2884 : * again.
2885 : */
2886 916 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2887 916 : if (HeapTupleIsValid(tuple))
2888 : {
2889 916 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2890 :
2891 916 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2892 : {
2893 : Datum datum;
2894 : bool isnull;
2895 :
2896 : /* oid of the database */
2897 916 : if (dbIdP)
2898 916 : *dbIdP = dbOid;
2899 : /* oid of the owner */
2900 916 : if (ownerIdP)
2901 790 : *ownerIdP = dbform->datdba;
2902 : /* character encoding */
2903 916 : if (encodingP)
2904 790 : *encodingP = dbform->encoding;
2905 : /* allowed as template? */
2906 916 : if (dbIsTemplateP)
2907 882 : *dbIsTemplateP = dbform->datistemplate;
2908 : /* Has on login event trigger? */
2909 916 : if (dbHasLoginEvtP)
2910 790 : *dbHasLoginEvtP = dbform->dathasloginevt;
2911 : /* allowing connections? */
2912 916 : if (dbAllowConnP)
2913 790 : *dbAllowConnP = dbform->datallowconn;
2914 : /* limit of frozen XIDs */
2915 916 : if (dbFrozenXidP)
2916 790 : *dbFrozenXidP = dbform->datfrozenxid;
2917 : /* minimum MultiXactId */
2918 916 : if (dbMinMultiP)
2919 790 : *dbMinMultiP = dbform->datminmxid;
2920 : /* default tablespace for this database */
2921 916 : if (dbTablespace)
2922 812 : *dbTablespace = dbform->dattablespace;
2923 : /* default locale settings for this database */
2924 916 : if (dbLocProvider)
2925 790 : *dbLocProvider = dbform->datlocprovider;
2926 916 : if (dbCollate)
2927 : {
2928 790 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2929 790 : *dbCollate = TextDatumGetCString(datum);
2930 : }
2931 916 : if (dbCtype)
2932 : {
2933 790 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2934 790 : *dbCtype = TextDatumGetCString(datum);
2935 : }
2936 916 : if (dbLocale)
2937 : {
2938 790 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2939 790 : if (isnull)
2940 732 : *dbLocale = NULL;
2941 : else
2942 58 : *dbLocale = TextDatumGetCString(datum);
2943 : }
2944 916 : if (dbIcurules)
2945 : {
2946 790 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2947 790 : if (isnull)
2948 790 : *dbIcurules = NULL;
2949 : else
2950 0 : *dbIcurules = TextDatumGetCString(datum);
2951 : }
2952 916 : if (dbCollversion)
2953 : {
2954 790 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2955 790 : if (isnull)
2956 476 : *dbCollversion = NULL;
2957 : else
2958 314 : *dbCollversion = TextDatumGetCString(datum);
2959 : }
2960 916 : ReleaseSysCache(tuple);
2961 916 : result = true;
2962 916 : break;
2963 : }
2964 : /* can only get here if it was just renamed */
2965 0 : ReleaseSysCache(tuple);
2966 : }
2967 :
2968 0 : if (lockmode != NoLock)
2969 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2970 : }
2971 :
2972 948 : table_close(relation, AccessShareLock);
2973 :
2974 948 : return result;
2975 : }
2976 :
2977 : /* Check if current user has createdb privileges */
2978 : bool
2979 868 : have_createdb_privilege(void)
2980 : {
2981 868 : bool result = false;
2982 : HeapTuple utup;
2983 :
2984 : /* Superusers can always do everything */
2985 868 : if (superuser())
2986 832 : return true;
2987 :
2988 36 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2989 36 : if (HeapTupleIsValid(utup))
2990 : {
2991 36 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2992 36 : ReleaseSysCache(utup);
2993 : }
2994 36 : return result;
2995 : }
2996 :
2997 : /*
2998 : * Remove tablespace directories
2999 : *
3000 : * We don't know what tablespaces db_id is using, so iterate through all
3001 : * tablespaces removing <tablespace>/db_id
3002 : */
3003 : static void
3004 94 : remove_dbtablespaces(Oid db_id)
3005 : {
3006 : Relation rel;
3007 : TableScanDesc scan;
3008 : HeapTuple tuple;
3009 94 : List *ltblspc = NIL;
3010 : ListCell *cell;
3011 : int ntblspc;
3012 : int i;
3013 : Oid *tablespace_ids;
3014 :
3015 94 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3016 94 : scan = table_beginscan_catalog(rel, 0, NULL);
3017 348 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3018 : {
3019 254 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3020 254 : Oid dsttablespace = spcform->oid;
3021 : char *dstpath;
3022 : struct stat st;
3023 :
3024 : /* Don't mess with the global tablespace */
3025 254 : if (dsttablespace == GLOBALTABLESPACE_OID)
3026 160 : continue;
3027 :
3028 160 : dstpath = GetDatabasePath(db_id, dsttablespace);
3029 :
3030 160 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3031 : {
3032 : /* Assume we can ignore it */
3033 66 : pfree(dstpath);
3034 66 : continue;
3035 : }
3036 :
3037 94 : if (!rmtree(dstpath, true))
3038 0 : ereport(WARNING,
3039 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3040 : dstpath)));
3041 :
3042 94 : ltblspc = lappend_oid(ltblspc, dsttablespace);
3043 94 : pfree(dstpath);
3044 : }
3045 :
3046 94 : ntblspc = list_length(ltblspc);
3047 94 : if (ntblspc == 0)
3048 : {
3049 0 : table_endscan(scan);
3050 0 : table_close(rel, AccessShareLock);
3051 0 : return;
3052 : }
3053 :
3054 94 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3055 94 : i = 0;
3056 188 : foreach(cell, ltblspc)
3057 94 : tablespace_ids[i++] = lfirst_oid(cell);
3058 :
3059 : /* Record the filesystem change in XLOG */
3060 : {
3061 : xl_dbase_drop_rec xlrec;
3062 :
3063 94 : xlrec.db_id = db_id;
3064 94 : xlrec.ntablespaces = ntblspc;
3065 :
3066 94 : XLogBeginInsert();
3067 94 : XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3068 94 : XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3069 :
3070 94 : (void) XLogInsert(RM_DBASE_ID,
3071 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3072 : }
3073 :
3074 94 : list_free(ltblspc);
3075 94 : pfree(tablespace_ids);
3076 :
3077 94 : table_endscan(scan);
3078 94 : table_close(rel, AccessShareLock);
3079 : }
3080 :
3081 : /*
3082 : * Check for existing files that conflict with a proposed new DB OID;
3083 : * return true if there are any
3084 : *
3085 : * If there were a subdirectory in any tablespace matching the proposed new
3086 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
3087 : * try to remove that already-existing subdirectory during the cleanup in
3088 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3089 : * instead we make this extra check before settling on the OID of the new
3090 : * database. This exactly parallels what GetNewRelFileNumber() does for table
3091 : * relfilenumber values.
3092 : */
3093 : static bool
3094 764 : check_db_file_conflict(Oid db_id)
3095 : {
3096 764 : bool result = false;
3097 : Relation rel;
3098 : TableScanDesc scan;
3099 : HeapTuple tuple;
3100 :
3101 764 : rel = table_open(TableSpaceRelationId, AccessShareLock);
3102 764 : scan = table_beginscan_catalog(rel, 0, NULL);
3103 2432 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3104 : {
3105 1668 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3106 1668 : Oid dsttablespace = spcform->oid;
3107 : char *dstpath;
3108 : struct stat st;
3109 :
3110 : /* Don't mess with the global tablespace */
3111 1668 : if (dsttablespace == GLOBALTABLESPACE_OID)
3112 764 : continue;
3113 :
3114 904 : dstpath = GetDatabasePath(db_id, dsttablespace);
3115 :
3116 904 : if (lstat(dstpath, &st) == 0)
3117 : {
3118 : /* Found a conflicting file (or directory, whatever) */
3119 0 : pfree(dstpath);
3120 0 : result = true;
3121 0 : break;
3122 : }
3123 :
3124 904 : pfree(dstpath);
3125 : }
3126 :
3127 764 : table_endscan(scan);
3128 764 : table_close(rel, AccessShareLock);
3129 :
3130 764 : return result;
3131 : }
3132 :
3133 : /*
3134 : * Issue a suitable errdetail message for a busy database
3135 : */
3136 : static int
3137 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
3138 : {
3139 0 : if (notherbackends > 0 && npreparedxacts > 0)
3140 :
3141 : /*
3142 : * We don't deal with singular versus plural here, since gettext
3143 : * doesn't support multiple plurals in one string.
3144 : */
3145 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3146 : notherbackends, npreparedxacts);
3147 0 : else if (notherbackends > 0)
3148 0 : errdetail_plural("There is %d other session using the database.",
3149 : "There are %d other sessions using the database.",
3150 : notherbackends,
3151 : notherbackends);
3152 : else
3153 0 : errdetail_plural("There is %d prepared transaction using the database.",
3154 : "There are %d prepared transactions using the database.",
3155 : npreparedxacts,
3156 : npreparedxacts);
3157 0 : return 0; /* just to keep ereport macro happy */
3158 : }
3159 :
3160 : /*
3161 : * get_database_oid - given a database name, look up the OID
3162 : *
3163 : * If missing_ok is false, throw an error if database name not found. If
3164 : * true, just return InvalidOid.
3165 : */
3166 : Oid
3167 2966 : get_database_oid(const char *dbname, bool missing_ok)
3168 : {
3169 : Relation pg_database;
3170 : ScanKeyData entry[1];
3171 : SysScanDesc scan;
3172 : HeapTuple dbtuple;
3173 : Oid oid;
3174 :
3175 : /*
3176 : * There's no syscache for pg_database indexed by name, so we must look
3177 : * the hard way.
3178 : */
3179 2966 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
3180 2966 : ScanKeyInit(&entry[0],
3181 : Anum_pg_database_datname,
3182 : BTEqualStrategyNumber, F_NAMEEQ,
3183 : CStringGetDatum(dbname));
3184 2966 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3185 : NULL, 1, entry);
3186 :
3187 2966 : dbtuple = systable_getnext(scan);
3188 :
3189 : /* We assume that there can be at most one matching tuple */
3190 2966 : if (HeapTupleIsValid(dbtuple))
3191 2154 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3192 : else
3193 812 : oid = InvalidOid;
3194 :
3195 2966 : systable_endscan(scan);
3196 2966 : table_close(pg_database, AccessShareLock);
3197 :
3198 2966 : if (!OidIsValid(oid) && !missing_ok)
3199 6 : ereport(ERROR,
3200 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3201 : errmsg("database \"%s\" does not exist",
3202 : dbname)));
3203 :
3204 2960 : return oid;
3205 : }
3206 :
3207 :
3208 : /*
3209 : * While dropping a database the pg_database row is marked invalid, but the
3210 : * catalog contents still exist. Connections to such a database are not
3211 : * allowed.
3212 : */
3213 : bool
3214 55274 : database_is_invalid_form(Form_pg_database datform)
3215 : {
3216 55274 : return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3217 : }
3218 :
3219 :
3220 : /*
3221 : * Convenience wrapper around database_is_invalid_form()
3222 : */
3223 : bool
3224 790 : database_is_invalid_oid(Oid dboid)
3225 : {
3226 : HeapTuple dbtup;
3227 : Form_pg_database dbform;
3228 : bool invalid;
3229 :
3230 790 : dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3231 790 : if (!HeapTupleIsValid(dbtup))
3232 0 : elog(ERROR, "cache lookup failed for database %u", dboid);
3233 790 : dbform = (Form_pg_database) GETSTRUCT(dbtup);
3234 :
3235 790 : invalid = database_is_invalid_form(dbform);
3236 :
3237 790 : ReleaseSysCache(dbtup);
3238 :
3239 790 : return invalid;
3240 : }
3241 :
3242 :
3243 : /*
3244 : * recovery_create_dbdir()
3245 : *
3246 : * During recovery, there's a case where we validly need to recover a missing
3247 : * tablespace directory so that recovery can continue. This happens when
3248 : * recovery wants to create a database but the holding tablespace has been
3249 : * removed before the server stopped. Since we expect that the directory will
3250 : * be gone before reaching recovery consistency, and we have no knowledge about
3251 : * the tablespace other than its OID here, we create a real directory under
3252 : * pg_tblspc here instead of restoring the symlink.
3253 : *
3254 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3255 : */
3256 : static void
3257 46 : recovery_create_dbdir(char *path, bool only_tblspc)
3258 : {
3259 : struct stat st;
3260 :
3261 : Assert(RecoveryInProgress());
3262 :
3263 46 : if (stat(path, &st) == 0)
3264 46 : return;
3265 :
3266 0 : if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3267 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3268 :
3269 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3270 0 : ereport(PANIC,
3271 : errmsg("missing directory \"%s\"", path));
3272 :
3273 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3274 : "creating missing directory: %s", path);
3275 :
3276 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3277 0 : ereport(PANIC,
3278 : errmsg("could not create missing directory \"%s\": %m", path));
3279 : }
3280 :
3281 :
3282 : /*
3283 : * DATABASE resource manager's routines
3284 : */
3285 : void
3286 84 : dbase_redo(XLogReaderState *record)
3287 : {
3288 84 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3289 :
3290 : /* Backup blocks are not used in dbase records */
3291 : Assert(!XLogRecHasAnyBlockRefs(record));
3292 :
3293 84 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3294 : {
3295 10 : xl_dbase_create_file_copy_rec *xlrec =
3296 10 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3297 : char *src_path;
3298 : char *dst_path;
3299 : char *parent_path;
3300 : struct stat st;
3301 :
3302 10 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3303 10 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3304 :
3305 : /*
3306 : * Our theory for replaying a CREATE is to forcibly drop the target
3307 : * subdirectory if present, then re-copy the source data. This may be
3308 : * more work than needed, but it is simple to implement.
3309 : */
3310 10 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3311 : {
3312 0 : if (!rmtree(dst_path, true))
3313 : /* If this failed, copydir() below is going to error. */
3314 0 : ereport(WARNING,
3315 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3316 : dst_path)));
3317 : }
3318 :
3319 : /*
3320 : * If the parent of the target path doesn't exist, create it now. This
3321 : * enables us to create the target underneath later.
3322 : */
3323 10 : parent_path = pstrdup(dst_path);
3324 10 : get_parent_directory(parent_path);
3325 10 : if (stat(parent_path, &st) < 0)
3326 : {
3327 0 : if (errno != ENOENT)
3328 0 : ereport(FATAL,
3329 : errmsg("could not stat directory \"%s\": %m",
3330 : dst_path));
3331 :
3332 : /* create the parent directory if needed and valid */
3333 0 : recovery_create_dbdir(parent_path, true);
3334 : }
3335 10 : pfree(parent_path);
3336 :
3337 : /*
3338 : * There's a case where the copy source directory is missing for the
3339 : * same reason above. Create the empty source directory so that
3340 : * copydir below doesn't fail. The directory will be dropped soon by
3341 : * recovery.
3342 : */
3343 10 : if (stat(src_path, &st) < 0 && errno == ENOENT)
3344 0 : recovery_create_dbdir(src_path, false);
3345 :
3346 : /*
3347 : * Force dirty buffers out to disk, to ensure source database is
3348 : * up-to-date for the copy.
3349 : */
3350 10 : FlushDatabaseBuffers(xlrec->src_db_id);
3351 :
3352 : /* Close all smgr fds in all backends. */
3353 10 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3354 :
3355 : /*
3356 : * Copy this subdirectory to the new location
3357 : *
3358 : * We don't need to copy subdirectories
3359 : */
3360 10 : copydir(src_path, dst_path, false);
3361 :
3362 10 : pfree(src_path);
3363 10 : pfree(dst_path);
3364 : }
3365 74 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3366 : {
3367 46 : xl_dbase_create_wal_log_rec *xlrec =
3368 46 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3369 : char *dbpath;
3370 : char *parent_path;
3371 :
3372 46 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3373 :
3374 : /* create the parent directory if needed and valid */
3375 46 : parent_path = pstrdup(dbpath);
3376 46 : get_parent_directory(parent_path);
3377 46 : recovery_create_dbdir(parent_path, true);
3378 :
3379 : /* Create the database directory with the version file. */
3380 46 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3381 : true);
3382 46 : pfree(dbpath);
3383 : }
3384 28 : else if (info == XLOG_DBASE_DROP)
3385 : {
3386 28 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3387 : char *dst_path;
3388 : int i;
3389 :
3390 28 : if (InHotStandby)
3391 : {
3392 : /*
3393 : * Lock database while we resolve conflicts to ensure that
3394 : * InitPostgres() cannot fully re-execute concurrently. This
3395 : * avoids backends re-connecting automatically to same database,
3396 : * which can happen in some cases.
3397 : *
3398 : * This will lock out walsenders trying to connect to db-specific
3399 : * slots for logical decoding too, so it's safe for us to drop
3400 : * slots.
3401 : */
3402 28 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3403 28 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3404 : }
3405 :
3406 : /* Drop any database-specific replication slots */
3407 28 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3408 :
3409 : /* Drop pages for this database that are in the shared buffer cache */
3410 28 : DropDatabaseBuffers(xlrec->db_id);
3411 :
3412 : /* Also, clean out any fsync requests that might be pending in md.c */
3413 28 : ForgetDatabaseSyncRequests(xlrec->db_id);
3414 :
3415 : /* Clean out the xlog relcache too */
3416 28 : XLogDropDatabase(xlrec->db_id);
3417 :
3418 : /* Close all smgr fds in all backends. */
3419 28 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3420 :
3421 56 : for (i = 0; i < xlrec->ntablespaces; i++)
3422 : {
3423 28 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3424 :
3425 : /* And remove the physical files */
3426 28 : if (!rmtree(dst_path, true))
3427 0 : ereport(WARNING,
3428 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
3429 : dst_path)));
3430 28 : pfree(dst_path);
3431 : }
3432 :
3433 28 : if (InHotStandby)
3434 : {
3435 : /*
3436 : * Release locks prior to commit. XXX There is a race condition
3437 : * here that may allow backends to reconnect, but the window for
3438 : * this is small because the gap between here and commit is mostly
3439 : * fairly small and it is unlikely that people will be dropping
3440 : * databases that we are trying to connect to anyway.
3441 : */
3442 28 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3443 : }
3444 : }
3445 : else
3446 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
3447 84 : }
|