Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack.c
4 : * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 : * parts of this code.
6 : *
7 : * There are two somewhat different ways to rewrite a table. In non-
8 : * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 : * transient relation, copy the tuples over to the relfilenode of the new
10 : * relation, swap the relfilenodes, then drop the old relation.
11 : *
12 : * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 : * then do an initial copy as above. However, while the tuples are being
14 : * copied, concurrent transactions could modify the table. To cope with those
15 : * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 : * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 : * from being reserved), and accumulates the changes in a file. Once the
18 : * initial copy is complete, we read the changes from the file and re-apply
19 : * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 : * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 : * a strong lock on the table is much reduced, and the bloat is eliminated.
22 : *
23 : *
24 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 : * Portions Copyright (c) 1994-5, Regents of the University of California
26 : *
27 : *
28 : * IDENTIFICATION
29 : * src/backend/commands/repack.c
30 : *
31 : *-------------------------------------------------------------------------
32 : */
33 : #include "postgres.h"
34 :
35 : #include "access/amapi.h"
36 : #include "access/heapam.h"
37 : #include "access/multixact.h"
38 : #include "access/relscan.h"
39 : #include "access/tableam.h"
40 : #include "access/toast_internals.h"
41 : #include "access/transam.h"
42 : #include "access/xact.h"
43 : #include "catalog/catalog.h"
44 : #include "catalog/dependency.h"
45 : #include "catalog/heap.h"
46 : #include "catalog/index.h"
47 : #include "catalog/namespace.h"
48 : #include "catalog/objectaccess.h"
49 : #include "catalog/pg_am.h"
50 : #include "catalog/pg_constraint.h"
51 : #include "catalog/pg_inherits.h"
52 : #include "catalog/toasting.h"
53 : #include "commands/defrem.h"
54 : #include "commands/progress.h"
55 : #include "commands/repack.h"
56 : #include "commands/repack_internal.h"
57 : #include "commands/tablecmds.h"
58 : #include "commands/vacuum.h"
59 : #include "executor/executor.h"
60 : #include "libpq/pqformat.h"
61 : #include "libpq/pqmq.h"
62 : #include "miscadmin.h"
63 : #include "optimizer/optimizer.h"
64 : #include "pgstat.h"
65 : #include "replication/logicalrelation.h"
66 : #include "storage/bufmgr.h"
67 : #include "storage/ipc.h"
68 : #include "storage/lmgr.h"
69 : #include "storage/predicate.h"
70 : #include "storage/proc.h"
71 : #include "utils/acl.h"
72 : #include "utils/fmgroids.h"
73 : #include "utils/guc.h"
74 : #include "utils/injection_point.h"
75 : #include "utils/inval.h"
76 : #include "utils/lsyscache.h"
77 : #include "utils/memutils.h"
78 : #include "utils/pg_rusage.h"
79 : #include "utils/relmapper.h"
80 : #include "utils/snapmgr.h"
81 : #include "utils/syscache.h"
82 : #include "utils/wait_event_types.h"
83 :
84 : /*
85 : * This struct is used to pass around the information on tables to be
86 : * clustered. We need this so we can make a list of them when invoked without
87 : * a specific table/index pair.
88 : */
89 : typedef struct
90 : {
91 : Oid tableOid;
92 : Oid indexOid;
93 : } RelToCluster;
94 :
95 : /*
96 : * The first file exported by the decoding worker must contain a snapshot, the
97 : * following ones contain the data changes.
98 : */
99 : #define WORKER_FILE_SNAPSHOT 0
100 :
101 : /*
102 : * Information needed to apply concurrent data changes.
103 : */
104 : typedef struct ChangeContext
105 : {
106 : /* The relation the changes are applied to. */
107 : Relation cc_rel;
108 :
109 : /* Needed to update indexes of cc_rel. */
110 : ResultRelInfo *cc_rri;
111 : EState *cc_estate;
112 :
113 : /*
114 : * Existing tuples to UPDATE and DELETE are located via this index. We
115 : * keep the scankey in partially initialized state to avoid repeated work.
116 : * sk_argument is completed on the fly.
117 : */
118 : Relation cc_ident_index;
119 : ScanKey cc_ident_key;
120 : int cc_ident_key_nentries;
121 :
122 : /* The latest column we need to deform to have the tuple identity */
123 : AttrNumber cc_last_key_attno;
124 :
125 : /* Sequential number of the file containing the changes. */
126 : int cc_file_seq;
127 : } ChangeContext;
128 :
129 : /*
130 : * Backend-local information to control the decoding worker.
131 : */
132 : typedef struct DecodingWorker
133 : {
134 : /* The worker. */
135 : BackgroundWorkerHandle *handle;
136 :
137 : /* DecodingWorkerShared is in this segment. */
138 : dsm_segment *seg;
139 :
140 : /* Handle of the error queue. */
141 : shm_mq_handle *error_mqh;
142 : } DecodingWorker;
143 :
144 : /* Pointer to currently running decoding worker. */
145 : static DecodingWorker *decoding_worker = NULL;
146 :
147 : /*
148 : * Is there a message sent by a repack worker that the backend needs to
149 : * receive?
150 : */
151 : volatile sig_atomic_t RepackMessagePending = false;
152 :
153 : static LOCKMODE RepackLockLevel(bool concurrent);
154 : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
155 : Oid indexOid, Oid userid, LOCKMODE lmode,
156 : int options);
157 : static void check_concurrent_repack_requirements(Relation rel,
158 : Oid *ident_idx_p);
159 : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
160 : Oid ident_idx);
161 : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
162 : Snapshot snapshot,
163 : bool verbose,
164 : bool *pSwapToastByContent,
165 : TransactionId *pFreezeXid,
166 : MultiXactId *pCutoffMulti);
167 : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
168 : MemoryContext permcxt);
169 : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
170 : Oid relid, bool rel_is_index,
171 : MemoryContext permcxt);
172 : static bool repack_is_permitted_for_relation(RepackCommand cmd,
173 : Oid relid, Oid userid);
174 :
175 : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
176 : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
177 : ChangeContext *chgcxt);
178 : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
179 : TupleTableSlot *ondisk_tuple,
180 : ChangeContext *chgcxt);
181 : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
182 : static void restore_tuple(BufFile *file, Relation relation,
183 : TupleTableSlot *slot);
184 : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
185 : TupleTableSlot *src);
186 : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
187 : TupleTableSlot *locator,
188 : TupleTableSlot *retrieved);
189 : static bool identity_key_equal(ChangeContext *chgcxt,
190 : TupleTableSlot *locator,
191 : TupleTableSlot *candidate);
192 : static void process_concurrent_changes(XLogRecPtr end_of_wal,
193 : ChangeContext *chgcxt,
194 : bool done);
195 : static void initialize_change_context(ChangeContext *chgcxt,
196 : Relation relation,
197 : Oid ident_index_id);
198 : static void release_change_context(ChangeContext *chgcxt);
199 : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
200 : Oid identIdx,
201 : TransactionId frozenXid,
202 : MultiXactId cutoffMulti);
203 : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
204 : static void copy_index_constraints(Relation old_index, Oid new_index_id,
205 : Oid new_heap_id);
206 : static Relation process_single_relation(RepackStmt *stmt,
207 : LOCKMODE lockmode,
208 : bool isTopLevel,
209 : ClusterParams *params);
210 : static Oid determine_clustered_index(Relation rel, bool usingindex,
211 : const char *indexname);
212 :
213 : static void start_repack_decoding_worker(Oid relid);
214 : static void stop_repack_decoding_worker(void);
215 : static void stop_repack_decoding_worker_cb(int code, Datum arg);
216 : static Snapshot get_initial_snapshot(DecodingWorker *worker);
217 :
218 : static void ProcessRepackMessage(StringInfo msg);
219 : static const char *RepackCommandAsString(RepackCommand cmd);
220 :
221 :
222 : /*
223 : * The repack code allows for processing multiple tables at once. Because
224 : * of this, we cannot just run everything on a single transaction, or we
225 : * would be forced to acquire exclusive locks on all the tables being
226 : * clustered, simultaneously --- very likely leading to deadlock.
227 : *
228 : * To solve this we follow a similar strategy to VACUUM code, processing each
229 : * relation in a separate transaction. For this to work, we need to:
230 : *
231 : * - provide a separate memory context so that we can pass information in
232 : * a way that survives across transactions
233 : * - start a new transaction every time a new relation is clustered
234 : * - check for validity of the information on to-be-clustered relations,
235 : * as someone might have deleted a relation behind our back, or
236 : * clustered one on a different index
237 : * - end the transaction
238 : *
239 : * The single-relation case does not have any such overhead.
240 : *
241 : * We also allow a relation to be repacked following an index, but without
242 : * naming a specific one. In that case, the indisclustered bit will be
243 : * looked up, and an ERROR will be thrown if no so-marked index is found.
244 : */
245 : void
246 231 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
247 : {
248 231 : ClusterParams params = {0};
249 231 : Relation rel = NULL;
250 : MemoryContext repack_context;
251 : LOCKMODE lockmode;
252 : List *rtcs;
253 231 : bool verbose = false;
254 231 : bool analyze = false;
255 231 : bool concurrently = false;
256 :
257 : /* Parse option list */
258 519 : foreach_node(DefElem, opt, stmt->params)
259 : {
260 57 : if (strcmp(opt->defname, "verbose") == 0)
261 7 : verbose = defGetBoolean(opt);
262 50 : else if (strcmp(opt->defname, "analyze") == 0 ||
263 42 : strcmp(opt->defname, "analyse") == 0)
264 8 : analyze = defGetBoolean(opt);
265 42 : else if (strcmp(opt->defname, "concurrently") == 0)
266 : {
267 42 : if (stmt->command != REPACK_COMMAND_REPACK)
268 0 : ereport(ERROR,
269 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
270 : errmsg("CONCURRENTLY option not supported for %s",
271 : RepackCommandAsString(stmt->command)));
272 42 : concurrently = defGetBoolean(opt);
273 : }
274 : else
275 0 : ereport(ERROR,
276 : errcode(ERRCODE_SYNTAX_ERROR),
277 : errmsg("unrecognized %s option \"%s\"",
278 : RepackCommandAsString(stmt->command),
279 : opt->defname),
280 : parser_errposition(pstate, opt->location));
281 : }
282 :
283 462 : params.options |=
284 462 : (verbose ? CLUOPT_VERBOSE : 0) |
285 231 : (analyze ? CLUOPT_ANALYZE : 0) |
286 231 : (concurrently ? CLUOPT_CONCURRENT : 0);
287 :
288 : /* Determine the lock mode to use. */
289 231 : lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
290 :
291 231 : if ((params.options & CLUOPT_CONCURRENT) != 0)
292 : {
293 : /*
294 : * Make sure we're not in a transaction block.
295 : *
296 : * The reason is that repack_setup_logical_decoding() could wait
297 : * indefinitely for our XID to complete. (The deadlock detector would
298 : * not recognize it because we'd be waiting for ourselves, i.e. no
299 : * real lock conflict.) It would be possible to run in a transaction
300 : * block if we had no XID, but this restriction is simpler for users
301 : * to understand and we don't lose any functionality.
302 : */
303 42 : PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
304 : }
305 :
306 : /*
307 : * If a single relation is specified, process it and we're done ... unless
308 : * the relation is a partitioned table, in which case we fall through.
309 : */
310 231 : if (stmt->relation != NULL)
311 : {
312 215 : rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms);
313 166 : if (rel == NULL)
314 130 : return; /* all done */
315 : }
316 :
317 : /*
318 : * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
319 : * can add support for this later.
320 : */
321 52 : if (params.options & CLUOPT_ANALYZE)
322 0 : ereport(ERROR,
323 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324 : errmsg("cannot execute %s on multiple tables",
325 : "REPACK (ANALYZE)"));
326 :
327 : /*
328 : * By here, we know we are in a multi-table situation.
329 : *
330 : * Concurrent processing is currently considered rather special (e.g. in
331 : * terms of resources consumed) so it is not performed in bulk.
332 : */
333 52 : if (params.options & CLUOPT_CONCURRENT)
334 : {
335 4 : if (rel != NULL)
336 : {
337 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
338 4 : ereport(ERROR,
339 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
340 : errmsg("%s is not supported for partitioned tables",
341 : "REPACK (CONCURRENTLY)"),
342 : errhint("Consider running the command on individual partitions."));
343 : }
344 : else
345 0 : ereport(ERROR,
346 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
347 : errmsg("%s requires an explicit table name",
348 : "REPACK (CONCURRENTLY)"));
349 : }
350 :
351 : /*
352 : * In order to avoid holding locks for too long, we want to process each
353 : * table in its own transaction. This forces us to disallow running
354 : * inside a user transaction block.
355 : */
356 48 : PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
357 :
358 : /* Also, we need a memory context to hold our list of relations */
359 48 : repack_context = AllocSetContextCreate(PortalContext,
360 : "Repack",
361 : ALLOCSET_DEFAULT_SIZES);
362 :
363 : /*
364 : * Since we open a new transaction for each relation, we have to check
365 : * that the relation still is what we think it is.
366 : *
367 : * In single-transaction CLUSTER, we don't need the overhead.
368 : */
369 48 : params.options |= CLUOPT_RECHECK;
370 :
371 : /*
372 : * If we don't have a relation yet, determine a relation list. If we do,
373 : * then it must be a partitioned table, and we want to process its
374 : * partitions.
375 : */
376 48 : if (rel == NULL)
377 : {
378 : Assert(stmt->indexname == NULL);
379 16 : rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
380 : repack_context);
381 16 : params.options |= CLUOPT_RECHECK_ISCLUSTERED;
382 : }
383 : else
384 : {
385 : Oid relid;
386 : bool rel_is_index;
387 :
388 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
389 :
390 : /*
391 : * If USING INDEX was specified, resolve the index name now and pass
392 : * it down.
393 : */
394 32 : if (stmt->usingindex)
395 : {
396 : /*
397 : * If no index name was specified when repacking a partitioned
398 : * table, punt for now. Maybe we can improve this later.
399 : */
400 28 : if (!stmt->indexname)
401 : {
402 8 : if (stmt->command == REPACK_COMMAND_CLUSTER)
403 4 : ereport(ERROR,
404 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
405 : errmsg("there is no previously clustered index for table \"%s\"",
406 : RelationGetRelationName(rel)));
407 : else
408 4 : ereport(ERROR,
409 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
410 : /*- translator: first %s is name of a SQL command, eg. REPACK */
411 : errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
412 : RepackCommandAsString(stmt->command),
413 : RelationGetRelationName(rel)));
414 : }
415 :
416 20 : relid = determine_clustered_index(rel, stmt->usingindex,
417 20 : stmt->indexname);
418 20 : if (!OidIsValid(relid))
419 0 : elog(ERROR, "unable to determine index to cluster on");
420 20 : check_index_is_clusterable(rel, relid, AccessExclusiveLock);
421 :
422 16 : rel_is_index = true;
423 : }
424 : else
425 : {
426 4 : relid = RelationGetRelid(rel);
427 4 : rel_is_index = false;
428 : }
429 :
430 20 : rtcs = get_tables_to_repack_partitioned(stmt->command,
431 : relid, rel_is_index,
432 : repack_context);
433 :
434 : /* close parent relation, releasing lock on it */
435 20 : table_close(rel, AccessExclusiveLock);
436 20 : rel = NULL;
437 : }
438 :
439 : /* Commit to get out of starting transaction */
440 36 : PopActiveSnapshot();
441 36 : CommitTransactionCommand();
442 :
443 : /* Cluster the tables, each in a separate transaction */
444 : Assert(rel == NULL);
445 124 : foreach_ptr(RelToCluster, rtc, rtcs)
446 : {
447 : /* Start a new transaction for each relation. */
448 52 : StartTransactionCommand();
449 :
450 : /*
451 : * Open the target table, coping with the case where it has been
452 : * dropped.
453 : */
454 52 : rel = try_table_open(rtc->tableOid, lockmode);
455 52 : if (rel == NULL)
456 : {
457 0 : CommitTransactionCommand();
458 0 : continue;
459 : }
460 :
461 : /* functions in indexes may want a snapshot set */
462 52 : PushActiveSnapshot(GetTransactionSnapshot());
463 :
464 : /* Process this table */
465 52 : cluster_rel(stmt->command, rel, rtc->indexOid, ¶ms, isTopLevel);
466 : /* cluster_rel closes the relation, but keeps lock */
467 :
468 52 : PopActiveSnapshot();
469 52 : CommitTransactionCommand();
470 : }
471 :
472 : /* Start a new transaction for the cleanup work. */
473 36 : StartTransactionCommand();
474 :
475 : /* Clean up working storage */
476 36 : MemoryContextDelete(repack_context);
477 : }
478 :
479 : /*
480 : * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
481 : * operation to avoid any lock-upgrade hazards. In the concurrent case, we
482 : * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
483 : * processing and only acquire AccessExclusiveLock at the end, to swap the
484 : * relation -- supposedly for a short time.
485 : */
486 : static LOCKMODE
487 1079 : RepackLockLevel(bool concurrent)
488 : {
489 1079 : if (concurrent)
490 86 : return ShareUpdateExclusiveLock;
491 : else
492 993 : return AccessExclusiveLock;
493 : }
494 :
495 : /*
496 : * cluster_rel
497 : *
498 : * This clusters the table by creating a new, clustered table and
499 : * swapping the relfilenumbers of the new table and the old table, so
500 : * the OID of the original table is preserved. Thus we do not lose
501 : * GRANT, inheritance nor references to this table.
502 : *
503 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
504 : * the new table, it's better to create the indexes afterwards than to fill
505 : * them incrementally while we load the table.
506 : *
507 : * If indexOid is InvalidOid, the table will be rewritten in physical order
508 : * instead of index order.
509 : *
510 : * Note that, in the concurrent case, the function releases the lock at some
511 : * point, in order to get AccessExclusiveLock for the final steps (i.e. to
512 : * swap the relation files). To make things simpler, the caller should expect
513 : * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
514 : * AccessExclusiveLock is kept till the end of the transaction.)
515 : *
516 : * 'cmd' indicates which command is being executed, to be used for error
517 : * messages.
518 : */
519 : void
520 440 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
521 : ClusterParams *params, bool isTopLevel)
522 : {
523 440 : Oid tableOid = RelationGetRelid(OldHeap);
524 : Relation index;
525 : LOCKMODE lmode;
526 : Oid save_userid;
527 : int save_sec_context;
528 : int save_nestlevel;
529 440 : bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
530 440 : bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
531 440 : bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
532 440 : Oid ident_idx = InvalidOid;
533 :
534 : /* Determine the lock mode to use. */
535 440 : lmode = RepackLockLevel(concurrent);
536 :
537 : /*
538 : * Check some preconditions in the concurrent case. This also obtains the
539 : * replica index OID.
540 : */
541 440 : if (concurrent)
542 38 : check_concurrent_repack_requirements(OldHeap, &ident_idx);
543 :
544 : /* Check for user-requested abort. */
545 408 : CHECK_FOR_INTERRUPTS();
546 :
547 408 : pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
548 408 : pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
549 :
550 : /*
551 : * Switch to the table owner's userid, so that any index functions are run
552 : * as that user. Also lock down security-restricted operations and
553 : * arrange to make GUC variable changes local to this command.
554 : */
555 408 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
556 408 : SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
557 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
558 408 : save_nestlevel = NewGUCNestLevel();
559 408 : RestrictSearchPath();
560 :
561 : /*
562 : * Recheck that the relation is still what it was when we started.
563 : *
564 : * Note that it's critical to skip this in single-relation CLUSTER;
565 : * otherwise, we would reject an attempt to cluster using a
566 : * not-previously-clustered index.
567 : */
568 408 : if (recheck &&
569 52 : !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
570 52 : lmode, params->options))
571 0 : goto out;
572 :
573 : /*
574 : * We allow repacking shared catalogs only when not using an index. It
575 : * would work to use an index in most respects, but the index would only
576 : * get marked as indisclustered in the current database, leading to
577 : * unexpected behavior if CLUSTER were later invoked in another database.
578 : */
579 408 : if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
580 0 : ereport(ERROR,
581 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
582 : /*- translator: first %s is name of a SQL command, eg. REPACK */
583 : errmsg("cannot execute %s on a shared catalog",
584 : RepackCommandAsString(cmd)));
585 :
586 : /*
587 : * The CONCURRENTLY case should have been rejected earlier because it does
588 : * not support system catalogs.
589 : */
590 : Assert(!(OldHeap->rd_rel->relisshared && concurrent));
591 :
592 : /*
593 : * Don't process temp tables of other backends ... their local buffer
594 : * manager is not going to cope.
595 : */
596 408 : if (RELATION_IS_OTHER_TEMP(OldHeap))
597 0 : ereport(ERROR,
598 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
599 : /*- translator: first %s is name of a SQL command, eg. REPACK */
600 : errmsg("cannot execute %s on temporary tables of other sessions",
601 : RepackCommandAsString(cmd)));
602 :
603 : /*
604 : * Also check for active uses of the relation in the current transaction,
605 : * including open scans and pending AFTER trigger events.
606 : */
607 408 : CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
608 :
609 : /* Check heap and index are valid to cluster on */
610 408 : if (OidIsValid(indexOid))
611 : {
612 : /* verify the index is good and lock it */
613 147 : check_index_is_clusterable(OldHeap, indexOid, lmode);
614 : /* also open it */
615 147 : index = index_open(indexOid, NoLock);
616 : }
617 : else
618 261 : index = NULL;
619 :
620 : /*
621 : * When allow_system_table_mods is turned off, we disallow repacking a
622 : * catalog on a particular index unless that's already the clustered index
623 : * for that catalog.
624 : *
625 : * XXX We don't check for this in CLUSTER, because it's historically been
626 : * allowed.
627 : */
628 408 : if (cmd != REPACK_COMMAND_CLUSTER &&
629 280 : !allowSystemTableMods && OidIsValid(indexOid) &&
630 19 : IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
631 0 : ereport(ERROR,
632 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
633 : errmsg("permission denied: \"%s\" is a system catalog",
634 : RelationGetRelationName(OldHeap)),
635 : errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
636 : "allow_system_table_mods"));
637 :
638 : /*
639 : * Quietly ignore the request if this is a materialized view which has not
640 : * been populated from its query. No harm is done because there is no data
641 : * to deal with, and we don't want to throw an error if this is part of a
642 : * multi-relation request -- for example, CLUSTER was run on the entire
643 : * database.
644 : */
645 408 : if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
646 0 : !RelationIsPopulated(OldHeap))
647 : {
648 0 : if (index)
649 0 : index_close(index, lmode);
650 0 : relation_close(OldHeap, lmode);
651 0 : goto out;
652 : }
653 :
654 : Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
655 : OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
656 : OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
657 :
658 : /*
659 : * All predicate locks on the tuples or pages are about to be made
660 : * invalid, because we move tuples around. Promote them to relation
661 : * locks. Predicate locks on indexes will be promoted when they are
662 : * reindexed.
663 : *
664 : * During concurrent processing, the heap as well as its indexes stay in
665 : * operation, so we postpone this step until they are locked using
666 : * AccessExclusiveLock near the end of the processing.
667 : */
668 408 : if (!concurrent)
669 402 : TransferPredicateLocksToHeapRelation(OldHeap);
670 :
671 : /*
672 : * rebuild_relation does all the dirty work, and closes OldHeap and index,
673 : * if valid.
674 : *
675 : * In concurrent mode, make sure the worker terminates; normally it does
676 : * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
677 : * happens even in case this backend dies early on a FATAL exit. Normal
678 : * mode doesn't need that overhead.
679 : */
680 408 : if (concurrent)
681 : {
682 6 : PG_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
683 : {
684 6 : rebuild_relation(OldHeap, index, verbose, ident_idx);
685 : }
686 6 : PG_END_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
687 6 : stop_repack_decoding_worker();
688 : }
689 : else
690 402 : rebuild_relation(OldHeap, index, verbose, ident_idx);
691 :
692 404 : out:
693 : /* Roll back any GUC changes executed by index functions */
694 404 : AtEOXact_GUC(false, save_nestlevel);
695 :
696 : /* Restore userid and security context */
697 404 : SetUserIdAndSecContext(save_userid, save_sec_context);
698 :
699 404 : pgstat_progress_end_command();
700 404 : }
701 :
702 : /*
703 : * Check if the table (and its index) still meets the requirements of
704 : * cluster_rel().
705 : */
706 : static bool
707 52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
708 : Oid userid, LOCKMODE lmode, int options)
709 : {
710 52 : Oid tableOid = RelationGetRelid(OldHeap);
711 :
712 : /* Check that the user still has privileges for the relation */
713 52 : if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
714 : {
715 0 : relation_close(OldHeap, lmode);
716 0 : return false;
717 : }
718 :
719 : /*
720 : * Silently skip a temp table for a remote session. Only doing this check
721 : * in the "recheck" case is appropriate (which currently means somebody is
722 : * executing a database-wide CLUSTER or on a partitioned table), because
723 : * there is another check in cluster() which will stop any attempt to
724 : * cluster remote temp tables by name. There is another check in
725 : * cluster_rel which is redundant, but we leave it for extra safety.
726 : */
727 52 : if (RELATION_IS_OTHER_TEMP(OldHeap))
728 : {
729 0 : relation_close(OldHeap, lmode);
730 0 : return false;
731 : }
732 :
733 52 : if (OidIsValid(indexOid))
734 : {
735 : /*
736 : * Check that the index still exists
737 : */
738 32 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
739 : {
740 0 : relation_close(OldHeap, lmode);
741 0 : return false;
742 : }
743 :
744 : /*
745 : * Check that the index is still the one with indisclustered set, if
746 : * needed.
747 : */
748 32 : if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
749 4 : !get_index_isclustered(indexOid))
750 : {
751 0 : relation_close(OldHeap, lmode);
752 0 : return false;
753 : }
754 : }
755 :
756 52 : return true;
757 : }
758 :
759 : /*
760 : * Verify that the specified heap and index are valid to cluster on
761 : *
762 : * Side effect: obtains lock on the index. The caller may
763 : * in some cases already have a lock of the same strength on the table, but
764 : * not in all cases so we can't rely on the table-level lock for
765 : * protection here.
766 : */
767 : void
768 321 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
769 : {
770 : Relation OldIndex;
771 :
772 321 : OldIndex = index_open(indexOid, lockmode);
773 :
774 : /*
775 : * Check that index is in fact an index on the given relation
776 : */
777 321 : if (OldIndex->rd_index == NULL ||
778 321 : OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
779 0 : ereport(ERROR,
780 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
781 : errmsg("\"%s\" is not an index for table \"%s\"",
782 : RelationGetRelationName(OldIndex),
783 : RelationGetRelationName(OldHeap))));
784 :
785 : /* Index AM must allow clustering */
786 321 : if (!OldIndex->rd_indam->amclusterable)
787 0 : ereport(ERROR,
788 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
789 : errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
790 : RelationGetRelationName(OldIndex))));
791 :
792 : /*
793 : * Disallow clustering on incomplete indexes (those that might not index
794 : * every row of the relation). We could relax this by making a separate
795 : * seqscan pass over the table to copy the missing rows, but that seems
796 : * expensive and tedious.
797 : */
798 321 : if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
799 0 : ereport(ERROR,
800 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
801 : errmsg("cannot cluster on partial index \"%s\"",
802 : RelationGetRelationName(OldIndex))));
803 :
804 : /*
805 : * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
806 : * it might well not contain entries for every heap row, or might not even
807 : * be internally consistent. (But note that we don't check indcheckxmin;
808 : * the worst consequence of following broken HOT chains would be that we
809 : * might put recently-dead tuples out-of-order in the new table, and there
810 : * is little harm in that.)
811 : */
812 321 : if (!OldIndex->rd_index->indisvalid)
813 4 : ereport(ERROR,
814 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
815 : errmsg("cannot cluster on invalid index \"%s\"",
816 : RelationGetRelationName(OldIndex))));
817 :
818 : /* Drop relcache refcnt on OldIndex, but keep lock */
819 317 : index_close(OldIndex, NoLock);
820 317 : }
821 :
822 : /*
823 : * mark_index_clustered: mark the specified index as the one clustered on
824 : *
825 : * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
826 : */
827 : void
828 194 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
829 : {
830 : HeapTuple indexTuple;
831 : Form_pg_index indexForm;
832 : Relation pg_index;
833 : ListCell *index;
834 :
835 : Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
836 :
837 : /*
838 : * If the index is already marked clustered, no need to do anything.
839 : */
840 194 : if (OidIsValid(indexOid))
841 : {
842 186 : if (get_index_isclustered(indexOid))
843 39 : return;
844 : }
845 :
846 : /*
847 : * Check each index of the relation and set/clear the bit as needed.
848 : */
849 155 : pg_index = table_open(IndexRelationId, RowExclusiveLock);
850 :
851 464 : foreach(index, RelationGetIndexList(rel))
852 : {
853 309 : Oid thisIndexOid = lfirst_oid(index);
854 :
855 309 : indexTuple = SearchSysCacheCopy1(INDEXRELID,
856 : ObjectIdGetDatum(thisIndexOid));
857 309 : if (!HeapTupleIsValid(indexTuple))
858 0 : elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
859 309 : indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
860 :
861 : /*
862 : * Unset the bit if set. We know it's wrong because we checked this
863 : * earlier.
864 : */
865 309 : if (indexForm->indisclustered)
866 : {
867 20 : indexForm->indisclustered = false;
868 20 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
869 : }
870 289 : else if (thisIndexOid == indexOid)
871 : {
872 : /* this was checked earlier, but let's be real sure */
873 147 : if (!indexForm->indisvalid)
874 0 : elog(ERROR, "cannot cluster on invalid index %u", indexOid);
875 147 : indexForm->indisclustered = true;
876 147 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
877 : }
878 :
879 309 : InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
880 : InvalidOid, is_internal);
881 :
882 309 : heap_freetuple(indexTuple);
883 : }
884 :
885 155 : table_close(pg_index, RowExclusiveLock);
886 : }
887 :
888 : /*
889 : * Check if the CONCURRENTLY option is legal for the relation.
890 : *
891 : * *Ident_idx_p receives OID of the identity index.
892 : */
893 : static void
894 38 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
895 : {
896 : char relpersistence,
897 : replident;
898 : Oid ident_idx;
899 :
900 : /* Data changes in system relations are not logically decoded. */
901 38 : if (IsCatalogRelation(rel))
902 8 : ereport(ERROR,
903 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
904 : errmsg("cannot repack relation \"%s\"",
905 : RelationGetRelationName(rel)),
906 : errhint("%s is not supported for catalog relations.",
907 : "REPACK (CONCURRENTLY)"));
908 :
909 : /*
910 : * reorderbuffer.c does not seem to handle processing of TOAST relation
911 : * alone.
912 : */
913 30 : if (IsToastRelation(rel))
914 4 : ereport(ERROR,
915 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
916 : errmsg("cannot repack relation \"%s\"",
917 : RelationGetRelationName(rel)),
918 : errhint("%s is not supported for TOAST relations.",
919 : "REPACK (CONCURRENTLY)"));
920 :
921 26 : relpersistence = rel->rd_rel->relpersistence;
922 26 : if (relpersistence != RELPERSISTENCE_PERMANENT)
923 8 : ereport(ERROR,
924 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
925 : errmsg("cannot repack relation \"%s\"",
926 : RelationGetRelationName(rel)),
927 : errhint("%s is only allowed for permanent relations.",
928 : "REPACK (CONCURRENTLY)"));
929 :
930 : /* With NOTHING, WAL does not contain the old tuple. */
931 18 : replident = rel->rd_rel->relreplident;
932 18 : if (replident == REPLICA_IDENTITY_NOTHING)
933 4 : ereport(ERROR,
934 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
935 : errmsg("cannot repack relation \"%s\"",
936 : RelationGetRelationName(rel)),
937 : errhint("Relation \"%s\" has insufficient replication identity.",
938 : RelationGetRelationName(rel)));
939 :
940 : /*
941 : * Obtain the replica identity index -- either one that has been set
942 : * explicitly, or a non-deferrable primary key. If none of these cases
943 : * apply, the table cannot be repacked concurrently. It might be possible
944 : * to have repack work with a FULL replica identity; however that requires
945 : * more work and is not implemented yet.
946 : */
947 14 : ident_idx = GetRelationIdentityOrPK(rel);
948 14 : if (!OidIsValid(ident_idx))
949 8 : ereport(ERROR,
950 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
951 : errmsg("cannot process relation \"%s\"",
952 : RelationGetRelationName(rel)),
953 : errhint("Relation \"%s\" has no identity index.",
954 : RelationGetRelationName(rel)));
955 :
956 6 : *ident_idx_p = ident_idx;
957 6 : }
958 :
959 :
960 : /*
961 : * rebuild_relation: rebuild an existing relation in index or physical order
962 : *
963 : * OldHeap: table to rebuild. See cluster_rel() for comments on the required
964 : * lock strength.
965 : *
966 : * index: index to cluster by, or NULL to rewrite in physical order.
967 : *
968 : * ident_idx: identity index, to handle replaying of concurrent data changes
969 : * to the new heap. InvalidOid if there's no CONCURRENTLY option.
970 : *
971 : * On entry, heap and index (if one is given) must be open, and the
972 : * appropriate lock held on them -- AccessExclusiveLock for exclusive
973 : * processing and ShareUpdateExclusiveLock for concurrent processing.
974 : *
975 : * On exit, they are closed, but still locked with AccessExclusiveLock.
976 : * (The function handles the lock upgrade if 'concurrent' is true.)
977 : */
978 : static void
979 408 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
980 : Oid ident_idx)
981 : {
982 408 : Oid tableOid = RelationGetRelid(OldHeap);
983 408 : Oid accessMethod = OldHeap->rd_rel->relam;
984 408 : Oid tableSpace = OldHeap->rd_rel->reltablespace;
985 : Oid OIDNewHeap;
986 : Relation NewHeap;
987 : char relpersistence;
988 : bool swap_toast_by_content;
989 : TransactionId frozenXid;
990 : MultiXactId cutoffMulti;
991 408 : bool concurrent = OidIsValid(ident_idx);
992 408 : Snapshot snapshot = NULL;
993 : #if USE_ASSERT_CHECKING
994 : LOCKMODE lmode;
995 :
996 : lmode = RepackLockLevel(concurrent);
997 :
998 : Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
999 : Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
1000 : #endif
1001 :
1002 408 : if (concurrent)
1003 : {
1004 : /*
1005 : * The worker needs to be member of the locking group we're the leader
1006 : * of. We ought to become the leader before the worker starts. The
1007 : * worker will join the group as soon as it starts.
1008 : *
1009 : * This is to make sure that the deadlock described below is
1010 : * detectable by deadlock.c: if the worker waits for a transaction to
1011 : * complete and we are waiting for the worker output, then effectively
1012 : * we (i.e. this backend) are waiting for that transaction.
1013 : */
1014 6 : BecomeLockGroupLeader();
1015 :
1016 : /*
1017 : * Start the worker that decodes data changes applied while we're
1018 : * copying the table contents.
1019 : *
1020 : * Note that the worker has to wait for all transactions with XID
1021 : * already assigned to finish. If some of those transactions is
1022 : * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1023 : * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1024 : * Not sure this risk is worth unlocking/locking the table (and its
1025 : * clustering index) and checking again if it's still eligible for
1026 : * REPACK CONCURRENTLY.
1027 : */
1028 6 : start_repack_decoding_worker(tableOid);
1029 :
1030 : /*
1031 : * Wait until the worker has the initial snapshot and retrieve it.
1032 : */
1033 6 : snapshot = get_initial_snapshot(decoding_worker);
1034 :
1035 6 : PushActiveSnapshot(snapshot);
1036 : }
1037 :
1038 : /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1039 408 : if (index != NULL)
1040 147 : mark_index_clustered(OldHeap, RelationGetRelid(index), true);
1041 :
1042 : /* Remember info about rel before closing OldHeap */
1043 408 : relpersistence = OldHeap->rd_rel->relpersistence;
1044 :
1045 : /*
1046 : * Create the transient table that will receive the re-ordered data.
1047 : *
1048 : * OldHeap is already locked, so no need to lock it again. make_new_heap
1049 : * obtains AccessExclusiveLock on the new heap and its toast table.
1050 : */
1051 408 : OIDNewHeap = make_new_heap(tableOid, tableSpace,
1052 : accessMethod,
1053 : relpersistence,
1054 : NoLock);
1055 : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
1056 408 : NewHeap = table_open(OIDNewHeap, NoLock);
1057 :
1058 : /* Copy the heap data into the new table in the desired order */
1059 408 : copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
1060 : &swap_toast_by_content, &frozenXid, &cutoffMulti);
1061 :
1062 : /* The historic snapshot won't be needed anymore. */
1063 408 : if (snapshot)
1064 : {
1065 6 : PopActiveSnapshot();
1066 6 : UpdateActiveSnapshotCommandId();
1067 : }
1068 :
1069 408 : if (concurrent)
1070 : {
1071 : Assert(!swap_toast_by_content);
1072 :
1073 : /*
1074 : * Close the index, but keep the lock. Both heaps will be closed by
1075 : * the following call.
1076 : */
1077 6 : if (index)
1078 3 : index_close(index, NoLock);
1079 :
1080 6 : rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
1081 : frozenXid, cutoffMulti);
1082 :
1083 6 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1084 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1085 : }
1086 : else
1087 : {
1088 402 : bool is_system_catalog = IsSystemRelation(OldHeap);
1089 :
1090 : /* Close relcache entries, but keep lock until transaction commit */
1091 402 : table_close(OldHeap, NoLock);
1092 402 : if (index)
1093 144 : index_close(index, NoLock);
1094 :
1095 : /*
1096 : * Close the new relation so it can be dropped as soon as the storage
1097 : * is swapped. The relation is not visible to others, so no need to
1098 : * unlock it explicitly.
1099 : */
1100 402 : table_close(NewHeap, NoLock);
1101 :
1102 : /*
1103 : * Swap the physical files of the target and transient tables, then
1104 : * rebuild the target's indexes and throw away the transient table.
1105 : */
1106 402 : finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
1107 : swap_toast_by_content, false, true,
1108 : true, /* reindex */
1109 : frozenXid, cutoffMulti,
1110 : relpersistence);
1111 : }
1112 404 : }
1113 :
1114 :
1115 : /*
1116 : * Create the transient table that will be filled with new data during
1117 : * CLUSTER, ALTER TABLE, and similar operations. The transient table
1118 : * duplicates the logical structure of the OldHeap; but will have the
1119 : * specified physical storage properties NewTableSpace, NewAccessMethod, and
1120 : * relpersistence.
1121 : *
1122 : * After this, the caller should load the new heap with transferred/modified
1123 : * data, then call finish_heap_swap to complete the operation.
1124 : */
1125 : Oid
1126 1581 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
1127 : char relpersistence, LOCKMODE lockmode)
1128 : {
1129 : TupleDesc OldHeapDesc;
1130 : char NewHeapName[NAMEDATALEN];
1131 : Oid OIDNewHeap;
1132 : Oid toastid;
1133 : Relation OldHeap;
1134 : HeapTuple tuple;
1135 : Datum reloptions;
1136 : bool isNull;
1137 : Oid namespaceid;
1138 :
1139 1581 : OldHeap = table_open(OIDOldHeap, lockmode);
1140 1581 : OldHeapDesc = RelationGetDescr(OldHeap);
1141 :
1142 : /*
1143 : * Note that the NewHeap will not receive any of the defaults or
1144 : * constraints associated with the OldHeap; we don't need 'em, and there's
1145 : * no reason to spend cycles inserting them into the catalogs only to
1146 : * delete them.
1147 : */
1148 :
1149 : /*
1150 : * But we do want to use reloptions of the old heap for new heap.
1151 : */
1152 1581 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1153 1581 : if (!HeapTupleIsValid(tuple))
1154 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1155 1581 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1156 : &isNull);
1157 1581 : if (isNull)
1158 1488 : reloptions = (Datum) 0;
1159 :
1160 1581 : if (relpersistence == RELPERSISTENCE_TEMP)
1161 98 : namespaceid = LookupCreationNamespace("pg_temp");
1162 : else
1163 1483 : namespaceid = RelationGetNamespace(OldHeap);
1164 :
1165 : /*
1166 : * Create the new heap, using a temporary name in the same namespace as
1167 : * the existing table. NOTE: there is some risk of collision with user
1168 : * relnames. Working around this seems more trouble than it's worth; in
1169 : * particular, we can't create the new heap in a different namespace from
1170 : * the old, or we will have problems with the TEMP status of temp tables.
1171 : *
1172 : * Note: the new heap is not a shared relation, even if we are rebuilding
1173 : * a shared rel. However, we do make the new heap mapped if the source is
1174 : * mapped. This simplifies swap_relation_files, and is absolutely
1175 : * necessary for rebuilding pg_class, for reasons explained there.
1176 : */
1177 1581 : snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1178 :
1179 1581 : OIDNewHeap = heap_create_with_catalog(NewHeapName,
1180 : namespaceid,
1181 : NewTableSpace,
1182 : InvalidOid,
1183 : InvalidOid,
1184 : InvalidOid,
1185 1581 : OldHeap->rd_rel->relowner,
1186 : NewAccessMethod,
1187 : OldHeapDesc,
1188 : NIL,
1189 : RELKIND_RELATION,
1190 : relpersistence,
1191 : false,
1192 1581 : RelationIsMapped(OldHeap),
1193 : ONCOMMIT_NOOP,
1194 : reloptions,
1195 : false,
1196 : true,
1197 : true,
1198 : OIDOldHeap,
1199 1581 : NULL);
1200 : Assert(OIDNewHeap != InvalidOid);
1201 :
1202 1581 : ReleaseSysCache(tuple);
1203 :
1204 : /*
1205 : * Advance command counter so that the newly-created relation's catalog
1206 : * tuples will be visible to table_open.
1207 : */
1208 1581 : CommandCounterIncrement();
1209 :
1210 : /*
1211 : * If necessary, create a TOAST table for the new relation.
1212 : *
1213 : * If the relation doesn't have a TOAST table already, we can't need one
1214 : * for the new relation. The other way around is possible though: if some
1215 : * wide columns have been dropped, NewHeapCreateToastTable can decide that
1216 : * no TOAST table is needed for the new table.
1217 : *
1218 : * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1219 : * that the TOAST table will be visible for insertion.
1220 : */
1221 1581 : toastid = OldHeap->rd_rel->reltoastrelid;
1222 1581 : if (OidIsValid(toastid))
1223 : {
1224 : /* keep the existing toast table's reloptions, if any */
1225 563 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
1226 563 : if (!HeapTupleIsValid(tuple))
1227 0 : elog(ERROR, "cache lookup failed for relation %u", toastid);
1228 563 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1229 : &isNull);
1230 563 : if (isNull)
1231 563 : reloptions = (Datum) 0;
1232 :
1233 563 : NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1234 :
1235 563 : ReleaseSysCache(tuple);
1236 : }
1237 :
1238 1581 : table_close(OldHeap, NoLock);
1239 :
1240 1581 : return OIDNewHeap;
1241 : }
1242 :
1243 : /*
1244 : * Do the physical copying of table data.
1245 : *
1246 : * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1247 : * iff concurrent processing is required.
1248 : *
1249 : * There are three output parameters:
1250 : * *pSwapToastByContent is set true if toast tables must be swapped by content.
1251 : * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1252 : * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1253 : */
1254 : static void
1255 408 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
1256 : Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1257 : TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
1258 : {
1259 : Relation relRelation;
1260 : HeapTuple reltup;
1261 : Form_pg_class relform;
1262 : TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
1263 : TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY;
1264 : VacuumParams params;
1265 : struct VacuumCutoffs cutoffs;
1266 : bool use_sort;
1267 408 : double num_tuples = 0,
1268 408 : tups_vacuumed = 0,
1269 408 : tups_recently_dead = 0;
1270 : BlockNumber num_pages;
1271 408 : int elevel = verbose ? INFO : DEBUG2;
1272 : PGRUsage ru0;
1273 : char *nspname;
1274 408 : bool concurrent = snapshot != NULL;
1275 : LOCKMODE lmode;
1276 :
1277 408 : lmode = RepackLockLevel(concurrent);
1278 :
1279 408 : pg_rusage_init(&ru0);
1280 :
1281 : /* Store a copy of the namespace name for logging purposes */
1282 408 : nspname = get_namespace_name(RelationGetNamespace(OldHeap));
1283 :
1284 : /*
1285 : * Their tuple descriptors should be exactly alike, but here we only need
1286 : * assume that they have the same number of columns.
1287 : */
1288 408 : oldTupDesc = RelationGetDescr(OldHeap);
1289 408 : newTupDesc = RelationGetDescr(NewHeap);
1290 : Assert(newTupDesc->natts == oldTupDesc->natts);
1291 :
1292 : /*
1293 : * If the OldHeap has a toast table, get lock on the toast table to keep
1294 : * it from being vacuumed. This is needed because autovacuum processes
1295 : * toast tables independently of their main tables, with no lock on the
1296 : * latter. If an autovacuum were to start on the toast table after we
1297 : * compute our OldestXmin below, it would use a later OldestXmin, and then
1298 : * possibly remove as DEAD toast tuples belonging to main tuples we think
1299 : * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1300 : * tuples.
1301 : *
1302 : * We don't need to open the toast relation here, just lock it. The lock
1303 : * will be held till end of transaction.
1304 : */
1305 408 : if (OldHeap->rd_rel->reltoastrelid)
1306 135 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1307 :
1308 : /*
1309 : * If both tables have TOAST tables, perform toast swap by content. It is
1310 : * possible that the old table has a toast table but the new one doesn't,
1311 : * if toastable columns have been dropped. In that case we have to do
1312 : * swap by links. This is okay because swap by content is only essential
1313 : * for system catalogs, and we don't support schema changes for them.
1314 : */
1315 408 : if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1316 135 : !concurrent)
1317 : {
1318 132 : *pSwapToastByContent = true;
1319 :
1320 : /*
1321 : * When doing swap by content, any toast pointers written into NewHeap
1322 : * must use the old toast table's OID, because that's where the toast
1323 : * data will eventually be found. Set this up by setting rd_toastoid.
1324 : * This also tells toast_save_datum() to preserve the toast value
1325 : * OIDs, which we want so as not to invalidate toast pointers in
1326 : * system catalog caches, and to avoid making multiple copies of a
1327 : * single toast value.
1328 : *
1329 : * Note that we must hold NewHeap open until we are done writing data,
1330 : * since the relcache will not guarantee to remember this setting once
1331 : * the relation is closed. Also, this technique depends on the fact
1332 : * that no one will try to read from the NewHeap until after we've
1333 : * finished writing it and swapping the rels --- otherwise they could
1334 : * follow the toast pointers to the wrong place. (It would actually
1335 : * work for values copied over from the old toast table, but not for
1336 : * any values that we toast which were previously not toasted.)
1337 : *
1338 : * This would not work with CONCURRENTLY because we may need to delete
1339 : * TOASTed tuples from the new heap. With this hack, we'd delete them
1340 : * from the old heap.
1341 : */
1342 132 : NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1343 : }
1344 : else
1345 276 : *pSwapToastByContent = false;
1346 :
1347 : /*
1348 : * Compute xids used to freeze and weed out dead tuples and multixacts.
1349 : * Since we're going to rewrite the whole table anyway, there's no reason
1350 : * not to be aggressive about this.
1351 : */
1352 408 : memset(¶ms, 0, sizeof(VacuumParams));
1353 408 : vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs);
1354 :
1355 : /*
1356 : * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1357 : * backwards, so take the max.
1358 : */
1359 : {
1360 408 : TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1361 :
1362 816 : if (TransactionIdIsValid(relfrozenxid) &&
1363 408 : TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
1364 10 : cutoffs.FreezeLimit = relfrozenxid;
1365 : }
1366 :
1367 : /*
1368 : * MultiXactCutoff, similarly, shouldn't go backwards either.
1369 : */
1370 : {
1371 408 : MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1372 :
1373 816 : if (MultiXactIdIsValid(relminmxid) &&
1374 408 : MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
1375 0 : cutoffs.MultiXactCutoff = relminmxid;
1376 : }
1377 :
1378 : /*
1379 : * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1380 : * the OldHeap. We know how to use a sort to duplicate the ordering of a
1381 : * btree index, and will use seqscan-and-sort for that case if the planner
1382 : * tells us it's cheaper. Otherwise, always indexscan if an index is
1383 : * provided, else plain seqscan.
1384 : */
1385 408 : if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1386 145 : use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
1387 : RelationGetRelid(OldIndex));
1388 : else
1389 263 : use_sort = false;
1390 :
1391 : /* Log what we're doing */
1392 408 : if (OldIndex != NULL && !use_sort)
1393 62 : ereport(elevel,
1394 : errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1395 : nspname,
1396 : RelationGetRelationName(OldHeap),
1397 : RelationGetRelationName(OldIndex)));
1398 346 : else if (use_sort)
1399 85 : ereport(elevel,
1400 : errmsg("repacking \"%s.%s\" using sequential scan and sort",
1401 : nspname,
1402 : RelationGetRelationName(OldHeap)));
1403 : else
1404 261 : ereport(elevel,
1405 : errmsg("repacking \"%s.%s\" in physical order",
1406 : nspname,
1407 : RelationGetRelationName(OldHeap)));
1408 :
1409 : /*
1410 : * Hand off the actual copying to AM specific function, the generic code
1411 : * cannot know how to deal with visibility across AMs. Note that this
1412 : * routine is allowed to set FreezeXid / MultiXactCutoff to different
1413 : * values (e.g. because the AM doesn't use freezing).
1414 : */
1415 408 : table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
1416 : cutoffs.OldestXmin, snapshot,
1417 : &cutoffs.FreezeLimit,
1418 : &cutoffs.MultiXactCutoff,
1419 : &num_tuples, &tups_vacuumed,
1420 : &tups_recently_dead);
1421 :
1422 : /* return selected values to caller, get set as relfrozenxid/minmxid */
1423 408 : *pFreezeXid = cutoffs.FreezeLimit;
1424 408 : *pCutoffMulti = cutoffs.MultiXactCutoff;
1425 :
1426 : /*
1427 : * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1428 : * In the CONCURRENTLY case, we need to set it again before applying the
1429 : * concurrent changes.
1430 : */
1431 408 : NewHeap->rd_toastoid = InvalidOid;
1432 :
1433 408 : num_pages = RelationGetNumberOfBlocks(NewHeap);
1434 :
1435 : /* Log what we did */
1436 408 : ereport(elevel,
1437 : (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1438 : nspname,
1439 : RelationGetRelationName(OldHeap),
1440 : tups_vacuumed, num_tuples,
1441 : RelationGetNumberOfBlocks(OldHeap)),
1442 : errdetail("%.0f dead row versions cannot be removed yet.\n"
1443 : "%s.",
1444 : tups_recently_dead,
1445 : pg_rusage_show(&ru0))));
1446 :
1447 : /* Update pg_class to reflect the correct values of pages and tuples. */
1448 408 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1449 :
1450 408 : reltup = SearchSysCacheCopy1(RELOID,
1451 : ObjectIdGetDatum(RelationGetRelid(NewHeap)));
1452 408 : if (!HeapTupleIsValid(reltup))
1453 0 : elog(ERROR, "cache lookup failed for relation %u",
1454 : RelationGetRelid(NewHeap));
1455 408 : relform = (Form_pg_class) GETSTRUCT(reltup);
1456 :
1457 408 : relform->relpages = num_pages;
1458 408 : relform->reltuples = num_tuples;
1459 :
1460 : /* Don't update the stats for pg_class. See swap_relation_files. */
1461 408 : if (RelationGetRelid(OldHeap) != RelationRelationId)
1462 385 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1463 : else
1464 23 : CacheInvalidateRelcacheByTuple(reltup);
1465 :
1466 : /* Clean up. */
1467 408 : heap_freetuple(reltup);
1468 408 : table_close(relRelation, RowExclusiveLock);
1469 :
1470 : /* Make the update visible */
1471 408 : CommandCounterIncrement();
1472 408 : }
1473 :
1474 : /*
1475 : * Swap the physical files of two given relations.
1476 : *
1477 : * We swap the physical identity (reltablespace, relfilenumber) while keeping
1478 : * the same logical identities of the two relations. relpersistence is also
1479 : * swapped, which is critical since it determines where buffers live for each
1480 : * relation.
1481 : *
1482 : * We can swap associated TOAST data in either of two ways: recursively swap
1483 : * the physical content of the toast tables (and their indexes), or swap the
1484 : * TOAST links in the given relations' pg_class entries. The former is needed
1485 : * to manage rewrites of shared catalogs (where we cannot change the pg_class
1486 : * links) while the latter is the only way to handle cases in which a toast
1487 : * table is added or removed altogether.
1488 : *
1489 : * Additionally, the first relation is marked with relfrozenxid set to
1490 : * frozenXid. It seems a bit ugly to have this here, but the caller would
1491 : * have to do it anyway, so having it here saves a heap_update. Note: in
1492 : * the swap-toast-links case, we assume we don't need to change the toast
1493 : * table's relfrozenxid: the new version of the toast table should already
1494 : * have relfrozenxid set to RecentXmin, which is good enough.
1495 : *
1496 : * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1497 : * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1498 : * having to look the information up again later in finish_heap_swap.
1499 : */
1500 : static void
1501 1719 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1502 : bool swap_toast_by_content,
1503 : bool is_internal,
1504 : TransactionId frozenXid,
1505 : MultiXactId cutoffMulti,
1506 : Oid *mapped_tables)
1507 : {
1508 : Relation relRelation;
1509 : HeapTuple reltup1,
1510 : reltup2;
1511 : Form_pg_class relform1,
1512 : relform2;
1513 : RelFileNumber relfilenumber1,
1514 : relfilenumber2;
1515 : RelFileNumber swaptemp;
1516 : char swptmpchr;
1517 : Oid relam1,
1518 : relam2;
1519 :
1520 : /* We need writable copies of both pg_class tuples. */
1521 1719 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1522 :
1523 1719 : reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1524 1719 : if (!HeapTupleIsValid(reltup1))
1525 0 : elog(ERROR, "cache lookup failed for relation %u", r1);
1526 1719 : relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1527 :
1528 1719 : reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1529 1719 : if (!HeapTupleIsValid(reltup2))
1530 0 : elog(ERROR, "cache lookup failed for relation %u", r2);
1531 1719 : relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1532 :
1533 1719 : relfilenumber1 = relform1->relfilenode;
1534 1719 : relfilenumber2 = relform2->relfilenode;
1535 1719 : relam1 = relform1->relam;
1536 1719 : relam2 = relform2->relam;
1537 :
1538 1719 : if (RelFileNumberIsValid(relfilenumber1) &&
1539 : RelFileNumberIsValid(relfilenumber2))
1540 : {
1541 : /*
1542 : * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1543 : * relpersistence
1544 : */
1545 : Assert(!target_is_pg_class);
1546 :
1547 1630 : swaptemp = relform1->relfilenode;
1548 1630 : relform1->relfilenode = relform2->relfilenode;
1549 1630 : relform2->relfilenode = swaptemp;
1550 :
1551 1630 : swaptemp = relform1->reltablespace;
1552 1630 : relform1->reltablespace = relform2->reltablespace;
1553 1630 : relform2->reltablespace = swaptemp;
1554 :
1555 1630 : swaptemp = relform1->relam;
1556 1630 : relform1->relam = relform2->relam;
1557 1630 : relform2->relam = swaptemp;
1558 :
1559 1630 : swptmpchr = relform1->relpersistence;
1560 1630 : relform1->relpersistence = relform2->relpersistence;
1561 1630 : relform2->relpersistence = swptmpchr;
1562 :
1563 : /* Also swap toast links, if we're swapping by links */
1564 1630 : if (!swap_toast_by_content)
1565 : {
1566 1294 : swaptemp = relform1->reltoastrelid;
1567 1294 : relform1->reltoastrelid = relform2->reltoastrelid;
1568 1294 : relform2->reltoastrelid = swaptemp;
1569 : }
1570 : }
1571 : else
1572 : {
1573 : /*
1574 : * Mapped-relation case. Here we have to swap the relation mappings
1575 : * instead of modifying the pg_class columns. Both must be mapped.
1576 : */
1577 89 : if (RelFileNumberIsValid(relfilenumber1) ||
1578 : RelFileNumberIsValid(relfilenumber2))
1579 0 : elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1580 : NameStr(relform1->relname));
1581 :
1582 : /*
1583 : * We can't change the tablespace nor persistence of a mapped rel, and
1584 : * we can't handle toast link swapping for one either, because we must
1585 : * not apply any critical changes to its pg_class row. These cases
1586 : * should be prevented by upstream permissions tests, so these checks
1587 : * are non-user-facing emergency backstop.
1588 : */
1589 89 : if (relform1->reltablespace != relform2->reltablespace)
1590 0 : elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1591 : NameStr(relform1->relname));
1592 89 : if (relform1->relpersistence != relform2->relpersistence)
1593 0 : elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1594 : NameStr(relform1->relname));
1595 89 : if (relform1->relam != relform2->relam)
1596 0 : elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1597 : NameStr(relform1->relname));
1598 89 : if (!swap_toast_by_content &&
1599 29 : (relform1->reltoastrelid || relform2->reltoastrelid))
1600 0 : elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1601 : NameStr(relform1->relname));
1602 :
1603 : /*
1604 : * Fetch the mappings --- shouldn't fail, but be paranoid
1605 : */
1606 89 : relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
1607 89 : if (!RelFileNumberIsValid(relfilenumber1))
1608 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1609 : NameStr(relform1->relname), r1);
1610 89 : relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
1611 89 : if (!RelFileNumberIsValid(relfilenumber2))
1612 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1613 : NameStr(relform2->relname), r2);
1614 :
1615 : /*
1616 : * Send replacement mappings to relmapper. Note these won't actually
1617 : * take effect until CommandCounterIncrement.
1618 : */
1619 89 : RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1620 89 : RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1621 :
1622 : /* Pass OIDs of mapped r2 tables back to caller */
1623 89 : *mapped_tables++ = r2;
1624 : }
1625 :
1626 : /*
1627 : * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1628 : * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1629 : * new.
1630 : */
1631 : {
1632 : Relation rel1,
1633 : rel2;
1634 :
1635 1719 : rel1 = relation_open(r1, NoLock);
1636 1719 : rel2 = relation_open(r2, NoLock);
1637 1719 : rel2->rd_createSubid = rel1->rd_createSubid;
1638 1719 : rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1639 1719 : rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1640 1719 : RelationAssumeNewRelfilelocator(rel1);
1641 1719 : relation_close(rel1, NoLock);
1642 1719 : relation_close(rel2, NoLock);
1643 : }
1644 :
1645 : /*
1646 : * In the case of a shared catalog, these next few steps will only affect
1647 : * our own database's pg_class row; but that's okay, because they are all
1648 : * noncritical updates. That's also an important fact for the case of a
1649 : * mapped catalog, because it's possible that we'll commit the map change
1650 : * and then fail to commit the pg_class update.
1651 : */
1652 :
1653 : /* set rel1's frozen Xid and minimum MultiXid */
1654 1719 : if (relform1->relkind != RELKIND_INDEX)
1655 : {
1656 : Assert(!TransactionIdIsValid(frozenXid) ||
1657 : TransactionIdIsNormal(frozenXid));
1658 1580 : relform1->relfrozenxid = frozenXid;
1659 1580 : relform1->relminmxid = cutoffMulti;
1660 : }
1661 :
1662 : /* swap size statistics too, since new rel has freshly-updated stats */
1663 : {
1664 : int32 swap_pages;
1665 : float4 swap_tuples;
1666 : int32 swap_allvisible;
1667 : int32 swap_allfrozen;
1668 :
1669 1719 : swap_pages = relform1->relpages;
1670 1719 : relform1->relpages = relform2->relpages;
1671 1719 : relform2->relpages = swap_pages;
1672 :
1673 1719 : swap_tuples = relform1->reltuples;
1674 1719 : relform1->reltuples = relform2->reltuples;
1675 1719 : relform2->reltuples = swap_tuples;
1676 :
1677 1719 : swap_allvisible = relform1->relallvisible;
1678 1719 : relform1->relallvisible = relform2->relallvisible;
1679 1719 : relform2->relallvisible = swap_allvisible;
1680 :
1681 1719 : swap_allfrozen = relform1->relallfrozen;
1682 1719 : relform1->relallfrozen = relform2->relallfrozen;
1683 1719 : relform2->relallfrozen = swap_allfrozen;
1684 : }
1685 :
1686 : /*
1687 : * Update the tuples in pg_class --- unless the target relation of the
1688 : * swap is pg_class itself. In that case, there is zero point in making
1689 : * changes because we'd be updating the old data that we're about to throw
1690 : * away. Because the real work being done here for a mapped relation is
1691 : * just to change the relation map settings, it's all right to not update
1692 : * the pg_class rows in this case. The most important changes will instead
1693 : * performed later, in finish_heap_swap() itself.
1694 : */
1695 1719 : if (!target_is_pg_class)
1696 : {
1697 : CatalogIndexState indstate;
1698 :
1699 1696 : indstate = CatalogOpenIndexes(relRelation);
1700 1696 : CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
1701 : indstate);
1702 1696 : CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
1703 : indstate);
1704 1696 : CatalogCloseIndexes(indstate);
1705 : }
1706 : else
1707 : {
1708 : /* no update ... but we do still need relcache inval */
1709 23 : CacheInvalidateRelcacheByTuple(reltup1);
1710 23 : CacheInvalidateRelcacheByTuple(reltup2);
1711 : }
1712 :
1713 : /*
1714 : * Now that pg_class has been updated with its relevant information for
1715 : * the swap, update the dependency of the relations to point to their new
1716 : * table AM, if it has changed.
1717 : */
1718 1719 : if (relam1 != relam2)
1719 : {
1720 24 : if (changeDependencyFor(RelationRelationId,
1721 : r1,
1722 : AccessMethodRelationId,
1723 : relam1,
1724 : relam2) != 1)
1725 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1726 : get_namespace_name(get_rel_namespace(r1)),
1727 : get_rel_name(r1));
1728 24 : if (changeDependencyFor(RelationRelationId,
1729 : r2,
1730 : AccessMethodRelationId,
1731 : relam2,
1732 : relam1) != 1)
1733 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1734 : get_namespace_name(get_rel_namespace(r2)),
1735 : get_rel_name(r2));
1736 : }
1737 :
1738 : /*
1739 : * Post alter hook for modified relations. The change to r2 is always
1740 : * internal, but r1 depends on the invocation context.
1741 : */
1742 1719 : InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
1743 : InvalidOid, is_internal);
1744 1719 : InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
1745 : InvalidOid, true);
1746 :
1747 : /*
1748 : * If we have toast tables associated with the relations being swapped,
1749 : * deal with them too.
1750 : */
1751 1719 : if (relform1->reltoastrelid || relform2->reltoastrelid)
1752 : {
1753 534 : if (swap_toast_by_content)
1754 : {
1755 132 : if (relform1->reltoastrelid && relform2->reltoastrelid)
1756 : {
1757 : /* Recursively swap the contents of the toast tables */
1758 132 : swap_relation_files(relform1->reltoastrelid,
1759 : relform2->reltoastrelid,
1760 : target_is_pg_class,
1761 : swap_toast_by_content,
1762 : is_internal,
1763 : frozenXid,
1764 : cutoffMulti,
1765 : mapped_tables);
1766 : }
1767 : else
1768 : {
1769 : /* caller messed up */
1770 0 : elog(ERROR, "cannot swap toast files by content when there's only one");
1771 : }
1772 : }
1773 : else
1774 : {
1775 : /*
1776 : * We swapped the ownership links, so we need to change dependency
1777 : * data to match.
1778 : *
1779 : * NOTE: it is possible that only one table has a toast table.
1780 : *
1781 : * NOTE: at present, a TOAST table's only dependency is the one on
1782 : * its owning table. If more are ever created, we'd need to use
1783 : * something more selective than deleteDependencyRecordsFor() to
1784 : * get rid of just the link we want.
1785 : */
1786 : ObjectAddress baseobject,
1787 : toastobject;
1788 : long count;
1789 :
1790 : /*
1791 : * We disallow this case for system catalogs, to avoid the
1792 : * possibility that the catalog we're rebuilding is one of the
1793 : * ones the dependency changes would change. It's too late to be
1794 : * making any data changes to the target catalog.
1795 : */
1796 402 : if (IsSystemClass(r1, relform1))
1797 0 : elog(ERROR, "cannot swap toast files by links for system catalogs");
1798 :
1799 : /* Delete old dependencies */
1800 402 : if (relform1->reltoastrelid)
1801 : {
1802 381 : count = deleteDependencyRecordsFor(RelationRelationId,
1803 : relform1->reltoastrelid,
1804 : false);
1805 381 : if (count != 1)
1806 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1807 : count);
1808 : }
1809 402 : if (relform2->reltoastrelid)
1810 : {
1811 402 : count = deleteDependencyRecordsFor(RelationRelationId,
1812 : relform2->reltoastrelid,
1813 : false);
1814 402 : if (count != 1)
1815 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1816 : count);
1817 : }
1818 :
1819 : /* Register new dependencies */
1820 402 : baseobject.classId = RelationRelationId;
1821 402 : baseobject.objectSubId = 0;
1822 402 : toastobject.classId = RelationRelationId;
1823 402 : toastobject.objectSubId = 0;
1824 :
1825 402 : if (relform1->reltoastrelid)
1826 : {
1827 381 : baseobject.objectId = r1;
1828 381 : toastobject.objectId = relform1->reltoastrelid;
1829 381 : recordDependencyOn(&toastobject, &baseobject,
1830 : DEPENDENCY_INTERNAL);
1831 : }
1832 :
1833 402 : if (relform2->reltoastrelid)
1834 : {
1835 402 : baseobject.objectId = r2;
1836 402 : toastobject.objectId = relform2->reltoastrelid;
1837 402 : recordDependencyOn(&toastobject, &baseobject,
1838 : DEPENDENCY_INTERNAL);
1839 : }
1840 : }
1841 : }
1842 :
1843 : /*
1844 : * If we're swapping two toast tables by content, do the same for their
1845 : * valid index. The swap can actually be safely done only if the relations
1846 : * have indexes.
1847 : */
1848 1719 : if (swap_toast_by_content &&
1849 396 : relform1->relkind == RELKIND_TOASTVALUE &&
1850 132 : relform2->relkind == RELKIND_TOASTVALUE)
1851 : {
1852 : Oid toastIndex1,
1853 : toastIndex2;
1854 :
1855 : /* Get valid index for each relation */
1856 132 : toastIndex1 = toast_get_valid_index(r1,
1857 : AccessExclusiveLock);
1858 132 : toastIndex2 = toast_get_valid_index(r2,
1859 : AccessExclusiveLock);
1860 :
1861 132 : swap_relation_files(toastIndex1,
1862 : toastIndex2,
1863 : target_is_pg_class,
1864 : swap_toast_by_content,
1865 : is_internal,
1866 : InvalidTransactionId,
1867 : InvalidMultiXactId,
1868 : mapped_tables);
1869 : }
1870 :
1871 : /* Clean up. */
1872 1719 : heap_freetuple(reltup1);
1873 1719 : heap_freetuple(reltup2);
1874 :
1875 1719 : table_close(relRelation, RowExclusiveLock);
1876 1719 : }
1877 :
1878 : /*
1879 : * Remove the transient table that was built by make_new_heap, and finish
1880 : * cleaning up (including rebuilding all indexes on the old heap).
1881 : */
1882 : void
1883 1448 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1884 : bool is_system_catalog,
1885 : bool swap_toast_by_content,
1886 : bool check_constraints,
1887 : bool is_internal,
1888 : bool reindex,
1889 : TransactionId frozenXid,
1890 : MultiXactId cutoffMulti,
1891 : char newrelpersistence)
1892 : {
1893 : ObjectAddress object;
1894 : Oid mapped_tables[4];
1895 : int i;
1896 :
1897 : /* Report that we are now swapping relation files */
1898 1448 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1899 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
1900 :
1901 : /* Zero out possible results from swapped_relation_files */
1902 1448 : memset(mapped_tables, 0, sizeof(mapped_tables));
1903 :
1904 : /*
1905 : * Swap the contents of the heap relations (including any toast tables).
1906 : * Also set old heap's relfrozenxid to frozenXid.
1907 : */
1908 1448 : swap_relation_files(OIDOldHeap, OIDNewHeap,
1909 : (OIDOldHeap == RelationRelationId),
1910 : swap_toast_by_content, is_internal,
1911 : frozenXid, cutoffMulti, mapped_tables);
1912 :
1913 : /*
1914 : * If it's a system catalog, queue a sinval message to flush all catcaches
1915 : * on the catalog when we reach CommandCounterIncrement.
1916 : */
1917 1448 : if (is_system_catalog)
1918 121 : CacheInvalidateCatalog(OIDOldHeap);
1919 :
1920 1448 : if (reindex)
1921 : {
1922 : int reindex_flags;
1923 1442 : ReindexParams reindex_params = {0};
1924 :
1925 : /*
1926 : * Rebuild each index on the relation (but not the toast table, which
1927 : * is all-new at this point). It is important to do this before the
1928 : * DROP step because if we are processing a system catalog that will
1929 : * be used during DROP, we want to have its indexes available. There
1930 : * is no advantage to the other order anyway because this is all
1931 : * transactional, so no chance to reclaim disk space before commit. We
1932 : * do not need a final CommandCounterIncrement() because
1933 : * reindex_relation does it.
1934 : *
1935 : * Note: because index_build is called via reindex_relation, it will
1936 : * never set indcheckxmin true for the indexes. This is OK even
1937 : * though in some sense we are building new indexes rather than
1938 : * rebuilding existing ones, because the new heap won't contain any
1939 : * HOT chains at all, let alone broken ones, so it can't be necessary
1940 : * to set indcheckxmin.
1941 : */
1942 1442 : reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1943 1442 : if (check_constraints)
1944 1040 : reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1945 :
1946 : /*
1947 : * Ensure that the indexes have the same persistence as the parent
1948 : * relation.
1949 : */
1950 1442 : if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1951 25 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
1952 1417 : else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1953 1364 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
1954 :
1955 : /* Report that we are now reindexing relations */
1956 1442 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1957 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
1958 :
1959 1442 : reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
1960 : }
1961 :
1962 : /* Report that we are now doing clean up */
1963 1436 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1964 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1965 :
1966 : /*
1967 : * If the relation being rebuilt is pg_class, swap_relation_files()
1968 : * couldn't update pg_class's own pg_class entry (check comments in
1969 : * swap_relation_files()), thus relfrozenxid was not updated. That's
1970 : * annoying because a potential reason for doing a VACUUM FULL is a
1971 : * imminent or actual anti-wraparound shutdown. So, now that we can
1972 : * access the new relation using its indices, update relfrozenxid.
1973 : * pg_class doesn't have a toast relation, so we don't need to update the
1974 : * corresponding toast relation. Not that there's little point moving all
1975 : * relfrozenxid updates here since swap_relation_files() needs to write to
1976 : * pg_class for non-mapped relations anyway.
1977 : */
1978 1436 : if (OIDOldHeap == RelationRelationId)
1979 : {
1980 : Relation relRelation;
1981 : HeapTuple reltup;
1982 : Form_pg_class relform;
1983 :
1984 23 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1985 :
1986 23 : reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1987 23 : if (!HeapTupleIsValid(reltup))
1988 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1989 23 : relform = (Form_pg_class) GETSTRUCT(reltup);
1990 :
1991 23 : relform->relfrozenxid = frozenXid;
1992 23 : relform->relminmxid = cutoffMulti;
1993 :
1994 23 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1995 :
1996 23 : table_close(relRelation, RowExclusiveLock);
1997 : }
1998 :
1999 : /* Destroy new heap with old filenumber */
2000 1436 : object.classId = RelationRelationId;
2001 1436 : object.objectId = OIDNewHeap;
2002 1436 : object.objectSubId = 0;
2003 :
2004 1436 : if (!reindex)
2005 : {
2006 : /*
2007 : * Make sure the changes in pg_class are visible. This is especially
2008 : * important if !swap_toast_by_content, so that the correct TOAST
2009 : * relation is dropped. (reindex_relation() above did not help in this
2010 : * case))
2011 : */
2012 6 : CommandCounterIncrement();
2013 : }
2014 :
2015 : /*
2016 : * The new relation is local to our transaction and we know nothing
2017 : * depends on it, so DROP_RESTRICT should be OK.
2018 : */
2019 1436 : performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
2020 :
2021 : /* performDeletion does CommandCounterIncrement at end */
2022 :
2023 : /*
2024 : * Now we must remove any relation mapping entries that we set up for the
2025 : * transient table, as well as its toast table and toast index if any. If
2026 : * we fail to do this before commit, the relmapper will complain about new
2027 : * permanent map entries being added post-bootstrap.
2028 : */
2029 1525 : for (i = 0; OidIsValid(mapped_tables[i]); i++)
2030 89 : RelationMapRemoveMapping(mapped_tables[i]);
2031 :
2032 : /*
2033 : * At this point, everything is kosher except that, if we did toast swap
2034 : * by links, the toast table's name corresponds to the transient table.
2035 : * The name is irrelevant to the backend because it's referenced by OID,
2036 : * but users looking at the catalogs could be confused. Rename it to
2037 : * prevent this problem.
2038 : *
2039 : * Note no lock required on the relation, because we already hold an
2040 : * exclusive lock on it.
2041 : */
2042 1436 : if (!swap_toast_by_content)
2043 : {
2044 : Relation newrel;
2045 :
2046 1304 : newrel = table_open(OIDOldHeap, NoLock);
2047 1304 : if (OidIsValid(newrel->rd_rel->reltoastrelid))
2048 : {
2049 : Oid toastidx;
2050 : char NewToastName[NAMEDATALEN];
2051 :
2052 : /* Get the associated valid index to be renamed */
2053 381 : toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2054 : AccessExclusiveLock);
2055 :
2056 : /* rename the toast table ... */
2057 381 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2058 : OIDOldHeap);
2059 381 : RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2060 : NewToastName, true, false);
2061 :
2062 : /* ... and its valid index too. */
2063 381 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2064 : OIDOldHeap);
2065 :
2066 381 : RenameRelationInternal(toastidx,
2067 : NewToastName, true, true);
2068 :
2069 : /*
2070 : * Reset the relrewrite for the toast. The command-counter
2071 : * increment is required here as we are about to update the tuple
2072 : * that is updated as part of RenameRelationInternal.
2073 : */
2074 381 : CommandCounterIncrement();
2075 381 : ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2076 : }
2077 1304 : relation_close(newrel, NoLock);
2078 : }
2079 :
2080 : /* if it's not a catalog table, clear any missing attribute settings */
2081 1436 : if (!is_system_catalog)
2082 : {
2083 : Relation newrel;
2084 :
2085 1315 : newrel = table_open(OIDOldHeap, NoLock);
2086 1315 : RelationClearMissing(newrel);
2087 1315 : relation_close(newrel, NoLock);
2088 : }
2089 1436 : }
2090 :
2091 : /*
2092 : * Determine which relations to process, when REPACK/CLUSTER is called
2093 : * without specifying a table name. The exact process depends on whether
2094 : * USING INDEX was given or not, and in any case we only return tables and
2095 : * materialized views that the current user has privileges to repack/cluster.
2096 : *
2097 : * If USING INDEX was given, we scan pg_index to find those that have
2098 : * indisclustered set; if it was not given, scan pg_class and return all
2099 : * tables.
2100 : *
2101 : * Return it as a list of RelToCluster in the given memory context.
2102 : */
2103 : static List *
2104 16 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
2105 : {
2106 : Relation catalog;
2107 : TableScanDesc scan;
2108 : HeapTuple tuple;
2109 16 : List *rtcs = NIL;
2110 :
2111 16 : if (usingindex)
2112 : {
2113 : ScanKeyData entry;
2114 :
2115 : /*
2116 : * For USING INDEX, scan pg_index to find those with indisclustered.
2117 : */
2118 12 : catalog = table_open(IndexRelationId, AccessShareLock);
2119 12 : ScanKeyInit(&entry,
2120 : Anum_pg_index_indisclustered,
2121 : BTEqualStrategyNumber, F_BOOLEQ,
2122 : BoolGetDatum(true));
2123 12 : scan = table_beginscan_catalog(catalog, 1, &entry);
2124 24 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2125 : {
2126 : RelToCluster *rtc;
2127 : Form_pg_index index;
2128 : HeapTuple classtup;
2129 : Form_pg_class classForm;
2130 : MemoryContext oldcxt;
2131 :
2132 12 : index = (Form_pg_index) GETSTRUCT(tuple);
2133 :
2134 : /*
2135 : * Try to obtain a light lock on the index's table, to ensure it
2136 : * doesn't go away while we collect the list. If we cannot, just
2137 : * disregard it. Be sure to release this if we ultimately decide
2138 : * not to process the table!
2139 : */
2140 12 : if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
2141 0 : continue;
2142 :
2143 : /* Verify that the table still exists; skip if not */
2144 12 : classtup = SearchSysCache1(RELOID, ObjectIdGetDatum(index->indrelid));
2145 12 : if (!HeapTupleIsValid(classtup))
2146 : {
2147 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2148 0 : continue;
2149 : }
2150 12 : classForm = (Form_pg_class) GETSTRUCT(classtup);
2151 :
2152 : /* Skip temp relations belonging to other sessions */
2153 12 : if (classForm->relpersistence == RELPERSISTENCE_TEMP &&
2154 0 : !isTempOrTempToastNamespace(classForm->relnamespace))
2155 : {
2156 0 : ReleaseSysCache(classtup);
2157 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2158 0 : continue;
2159 : }
2160 :
2161 12 : ReleaseSysCache(classtup);
2162 :
2163 : /* noisily skip rels which the user can't process */
2164 12 : if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2165 : GetUserId()))
2166 : {
2167 8 : UnlockRelationOid(index->indrelid, AccessShareLock);
2168 8 : continue;
2169 : }
2170 :
2171 : /* Use a permanent memory context for the result list */
2172 4 : oldcxt = MemoryContextSwitchTo(permcxt);
2173 4 : rtc = palloc_object(RelToCluster);
2174 4 : rtc->tableOid = index->indrelid;
2175 4 : rtc->indexOid = index->indexrelid;
2176 4 : rtcs = lappend(rtcs, rtc);
2177 4 : MemoryContextSwitchTo(oldcxt);
2178 : }
2179 : }
2180 : else
2181 : {
2182 4 : catalog = table_open(RelationRelationId, AccessShareLock);
2183 4 : scan = table_beginscan_catalog(catalog, 0, NULL);
2184 :
2185 9164 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2186 : {
2187 : RelToCluster *rtc;
2188 : Form_pg_class class;
2189 : MemoryContext oldcxt;
2190 :
2191 9160 : class = (Form_pg_class) GETSTRUCT(tuple);
2192 :
2193 : /*
2194 : * Try to obtain a light lock on the table, to ensure it doesn't
2195 : * go away while we collect the list. If we cannot, just
2196 : * disregard the table. Be sure to release this if we ultimately
2197 : * decide not to process the table!
2198 : */
2199 9160 : if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
2200 0 : continue;
2201 :
2202 : /* Verify that the table still exists */
2203 9160 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
2204 : {
2205 0 : UnlockRelationOid(class->oid, AccessShareLock);
2206 0 : continue;
2207 : }
2208 :
2209 : /* Can only process plain tables and matviews */
2210 9160 : if (class->relkind != RELKIND_RELATION &&
2211 6068 : class->relkind != RELKIND_MATVIEW)
2212 : {
2213 6036 : UnlockRelationOid(class->oid, AccessShareLock);
2214 6036 : continue;
2215 : }
2216 :
2217 : /* Skip temp relations belonging to other sessions */
2218 3124 : if (class->relpersistence == RELPERSISTENCE_TEMP &&
2219 16 : !isTempOrTempToastNamespace(class->relnamespace))
2220 : {
2221 0 : UnlockRelationOid(class->oid, AccessShareLock);
2222 0 : continue;
2223 : }
2224 :
2225 : /* noisily skip rels which the user can't process */
2226 3124 : if (!repack_is_permitted_for_relation(cmd, class->oid,
2227 : GetUserId()))
2228 : {
2229 3116 : UnlockRelationOid(class->oid, AccessShareLock);
2230 3116 : continue;
2231 : }
2232 :
2233 : /* Use a permanent memory context for the result list */
2234 8 : oldcxt = MemoryContextSwitchTo(permcxt);
2235 8 : rtc = palloc_object(RelToCluster);
2236 8 : rtc->tableOid = class->oid;
2237 8 : rtc->indexOid = InvalidOid;
2238 8 : rtcs = lappend(rtcs, rtc);
2239 8 : MemoryContextSwitchTo(oldcxt);
2240 : }
2241 : }
2242 :
2243 16 : table_endscan(scan);
2244 16 : relation_close(catalog, AccessShareLock);
2245 :
2246 16 : return rtcs;
2247 : }
2248 :
2249 : /*
2250 : * Given a partitioned table or its index, return a list of RelToCluster for
2251 : * all the leaf child tables/indexes.
2252 : *
2253 : * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2254 : * owning relation.
2255 : */
2256 : static List *
2257 20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
2258 : bool rel_is_index, MemoryContext permcxt)
2259 : {
2260 : List *inhoids;
2261 20 : List *rtcs = NIL;
2262 :
2263 : /*
2264 : * Do not lock the children until they're processed. Note that we do hold
2265 : * a lock on the parent partitioned table.
2266 : */
2267 20 : inhoids = find_all_inheritors(relid, NoLock, NULL);
2268 148 : foreach_oid(child_oid, inhoids)
2269 : {
2270 : Oid table_oid,
2271 : index_oid;
2272 : RelToCluster *rtc;
2273 : MemoryContext oldcxt;
2274 :
2275 108 : if (rel_is_index)
2276 : {
2277 : /* consider only leaf indexes */
2278 80 : if (get_rel_relkind(child_oid) != RELKIND_INDEX)
2279 40 : continue;
2280 :
2281 40 : table_oid = IndexGetRelation(child_oid, false);
2282 40 : index_oid = child_oid;
2283 : }
2284 : else
2285 : {
2286 : /* consider only leaf relations */
2287 28 : if (get_rel_relkind(child_oid) != RELKIND_RELATION)
2288 16 : continue;
2289 :
2290 12 : table_oid = child_oid;
2291 12 : index_oid = InvalidOid;
2292 : }
2293 :
2294 : /*
2295 : * It's possible that the user does not have privileges to CLUSTER the
2296 : * leaf partition despite having them on the partitioned table. Skip
2297 : * if so.
2298 : */
2299 52 : if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
2300 12 : continue;
2301 :
2302 : /* Use a permanent memory context for the result list */
2303 40 : oldcxt = MemoryContextSwitchTo(permcxt);
2304 40 : rtc = palloc_object(RelToCluster);
2305 40 : rtc->tableOid = table_oid;
2306 40 : rtc->indexOid = index_oid;
2307 40 : rtcs = lappend(rtcs, rtc);
2308 40 : MemoryContextSwitchTo(oldcxt);
2309 : }
2310 :
2311 20 : return rtcs;
2312 : }
2313 :
2314 :
2315 : /*
2316 : * Return whether userid has privileges to REPACK relid. If not, this
2317 : * function emits a WARNING.
2318 : */
2319 : static bool
2320 3240 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
2321 : {
2322 : Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
2323 :
2324 3240 : if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2325 104 : return true;
2326 :
2327 3136 : ereport(WARNING,
2328 : errmsg("permission denied to execute %s on \"%s\", skipping it",
2329 : RepackCommandAsString(cmd),
2330 : get_rel_name(relid)));
2331 :
2332 3136 : return false;
2333 : }
2334 :
2335 :
2336 : /*
2337 : * Given a RepackStmt with an indicated relation name, resolve the relation
2338 : * name, obtain lock on it, then determine what to do based on the relation
2339 : * type: if it's table and not partitioned, repack it as indicated (using an
2340 : * existing clustered index, or following the given one), and return NULL.
2341 : *
2342 : * On the other hand, if the table is partitioned, do nothing further and
2343 : * instead return the opened and locked relcache entry, so that caller can
2344 : * process the partitions using the multiple-table handling code. In this
2345 : * case, if an index name is given, it's up to the caller to resolve it.
2346 : */
2347 : static Relation
2348 215 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
2349 : ClusterParams *params)
2350 : {
2351 : Relation rel;
2352 : Oid tableOid;
2353 :
2354 : Assert(stmt->relation != NULL);
2355 : Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2356 : stmt->command == REPACK_COMMAND_REPACK);
2357 :
2358 : /*
2359 : * Make sure ANALYZE is specified if a column list is present.
2360 : */
2361 215 : if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2362 4 : ereport(ERROR,
2363 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2364 : errmsg("ANALYZE option must be specified when a column list is provided"));
2365 :
2366 : /* Find, lock, and check permissions on the table. */
2367 211 : tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2368 : lockmode,
2369 : 0,
2370 : RangeVarCallbackMaintainsTable,
2371 : NULL);
2372 203 : rel = table_open(tableOid, NoLock);
2373 :
2374 : /*
2375 : * Reject clustering a remote temp table ... their local buffer manager is
2376 : * not going to cope.
2377 : */
2378 203 : if (RELATION_IS_OTHER_TEMP(rel))
2379 1 : ereport(ERROR,
2380 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2381 : /*- translator: first %s is name of a SQL command, eg. REPACK */
2382 : errmsg("cannot execute %s on temporary tables of other sessions",
2383 : RepackCommandAsString(stmt->command)));
2384 :
2385 : /*
2386 : * For partitioned tables, let caller handle this. Otherwise, process it
2387 : * here and we're done.
2388 : */
2389 202 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2390 36 : return rel;
2391 : else
2392 : {
2393 166 : Oid indexOid = InvalidOid;
2394 :
2395 166 : indexOid = determine_clustered_index(rel, stmt->usingindex,
2396 166 : stmt->indexname);
2397 162 : if (OidIsValid(indexOid))
2398 115 : check_index_is_clusterable(rel, indexOid, lockmode);
2399 :
2400 162 : cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2401 :
2402 : /*
2403 : * Do an analyze, if requested. We close the transaction and start a
2404 : * new one, so that we don't hold the stronger lock for longer than
2405 : * needed.
2406 : */
2407 130 : if (params->options & CLUOPT_ANALYZE)
2408 : {
2409 8 : VacuumParams vac_params = {0};
2410 :
2411 8 : PopActiveSnapshot();
2412 8 : CommitTransactionCommand();
2413 :
2414 8 : StartTransactionCommand();
2415 8 : PushActiveSnapshot(GetTransactionSnapshot());
2416 :
2417 8 : vac_params.options |= VACOPT_ANALYZE;
2418 8 : if (params->options & CLUOPT_VERBOSE)
2419 0 : vac_params.options |= VACOPT_VERBOSE;
2420 8 : analyze_rel(tableOid, NULL, &vac_params,
2421 8 : stmt->relation->va_cols, true, NULL);
2422 8 : PopActiveSnapshot();
2423 8 : CommandCounterIncrement();
2424 : }
2425 :
2426 130 : return NULL;
2427 : }
2428 : }
2429 :
2430 : /*
2431 : * Given a relation and the usingindex/indexname options in a
2432 : * REPACK USING INDEX or CLUSTER command, return the OID of the
2433 : * index to use for clustering the table.
2434 : *
2435 : * Caller must hold lock on the relation so that the set of indexes
2436 : * doesn't change, and must call check_index_is_clusterable.
2437 : */
2438 : static Oid
2439 186 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2440 : {
2441 : Oid indexOid;
2442 :
2443 186 : if (indexname == NULL && usingindex)
2444 : {
2445 : /*
2446 : * If USING INDEX with no name is given, find a clustered index, or
2447 : * error out if none.
2448 : */
2449 20 : indexOid = InvalidOid;
2450 44 : foreach_oid(idxoid, RelationGetIndexList(rel))
2451 : {
2452 20 : if (get_index_isclustered(idxoid))
2453 : {
2454 16 : indexOid = idxoid;
2455 16 : break;
2456 : }
2457 : }
2458 :
2459 20 : if (!OidIsValid(indexOid))
2460 4 : ereport(ERROR,
2461 : errcode(ERRCODE_UNDEFINED_OBJECT),
2462 : errmsg("there is no previously clustered index for table \"%s\"",
2463 : RelationGetRelationName(rel)));
2464 : }
2465 166 : else if (indexname != NULL)
2466 : {
2467 : /* An index was specified; obtain its OID. */
2468 119 : indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2469 119 : if (!OidIsValid(indexOid))
2470 0 : ereport(ERROR,
2471 : errcode(ERRCODE_UNDEFINED_OBJECT),
2472 : errmsg("index \"%s\" for table \"%s\" does not exist",
2473 : indexname, RelationGetRelationName(rel)));
2474 : }
2475 : else
2476 47 : indexOid = InvalidOid;
2477 :
2478 182 : return indexOid;
2479 : }
2480 :
2481 : static const char *
2482 3597 : RepackCommandAsString(RepackCommand cmd)
2483 : {
2484 3597 : switch (cmd)
2485 : {
2486 3190 : case REPACK_COMMAND_REPACK:
2487 3190 : return "REPACK";
2488 226 : case REPACK_COMMAND_VACUUMFULL:
2489 226 : return "VACUUM";
2490 181 : case REPACK_COMMAND_CLUSTER:
2491 181 : return "CLUSTER";
2492 : }
2493 0 : return "???"; /* keep compiler quiet */
2494 : }
2495 :
2496 : /*
2497 : * Apply all the changes stored in 'file'.
2498 : */
2499 : static void
2500 12 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
2501 : {
2502 12 : ConcurrentChangeKind kind = '\0';
2503 12 : Relation rel = chgcxt->cc_rel;
2504 : TupleTableSlot *spilled_tuple;
2505 : TupleTableSlot *old_update_tuple;
2506 : TupleTableSlot *ondisk_tuple;
2507 12 : bool have_old_tuple = false;
2508 : MemoryContext oldcxt;
2509 :
2510 12 : spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2511 : &TTSOpsVirtual);
2512 12 : ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2513 : table_slot_callbacks(rel));
2514 12 : old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2515 : &TTSOpsVirtual);
2516 :
2517 12 : oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
2518 :
2519 : while (true)
2520 40 : {
2521 : size_t nread;
2522 52 : ConcurrentChangeKind prevkind = kind;
2523 :
2524 52 : CHECK_FOR_INTERRUPTS();
2525 :
2526 52 : nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2527 52 : if (nread == 0) /* done with the file? */
2528 12 : break;
2529 :
2530 : /*
2531 : * If this is the old tuple for an update, read it into the tuple slot
2532 : * and go to the next one. The update itself will be executed on the
2533 : * next iteration, when we receive the NEW tuple.
2534 : */
2535 40 : if (kind == CHANGE_UPDATE_OLD)
2536 : {
2537 8 : restore_tuple(file, rel, old_update_tuple);
2538 8 : have_old_tuple = true;
2539 8 : continue;
2540 : }
2541 :
2542 : /*
2543 : * Just before an UPDATE or DELETE, we must update the command
2544 : * counter, because the change could refer to a tuple that we have
2545 : * just inserted; and before an INSERT, we have to do this also if the
2546 : * previous command was either update or delete.
2547 : *
2548 : * With this approach we don't spend so many CCIs for long strings of
2549 : * only INSERTs, which can't affect one another.
2550 : */
2551 32 : if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2552 7 : (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
2553 : prevkind == CHANGE_DELETE)))
2554 : {
2555 29 : CommandCounterIncrement();
2556 29 : UpdateActiveSnapshotCommandId();
2557 : }
2558 :
2559 : /*
2560 : * Now restore the tuple into the slot and execute the change.
2561 : */
2562 32 : restore_tuple(file, rel, spilled_tuple);
2563 :
2564 32 : if (kind == CHANGE_INSERT)
2565 : {
2566 7 : apply_concurrent_insert(rel, spilled_tuple, chgcxt);
2567 : }
2568 25 : else if (kind == CHANGE_DELETE)
2569 : {
2570 : bool found;
2571 :
2572 : /* Find the tuple to be deleted */
2573 3 : found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
2574 3 : if (!found)
2575 0 : elog(ERROR, "failed to find target tuple");
2576 3 : apply_concurrent_delete(rel, ondisk_tuple);
2577 : }
2578 22 : else if (kind == CHANGE_UPDATE_NEW)
2579 : {
2580 : TupleTableSlot *key;
2581 : bool found;
2582 :
2583 22 : if (have_old_tuple)
2584 8 : key = old_update_tuple;
2585 : else
2586 14 : key = spilled_tuple;
2587 :
2588 : /* Find the tuple to be updated or deleted. */
2589 22 : found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2590 22 : if (!found)
2591 0 : elog(ERROR, "failed to find target tuple");
2592 :
2593 : /*
2594 : * If 'tup' contains TOAST pointers, they point to the old
2595 : * relation's toast. Copy the corresponding TOAST pointers for the
2596 : * new relation from the existing tuple. (The fact that we
2597 : * received a TOAST pointer here implies that the attribute hasn't
2598 : * changed.)
2599 : */
2600 22 : adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
2601 :
2602 22 : apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
2603 :
2604 22 : ExecClearTuple(old_update_tuple);
2605 22 : have_old_tuple = false;
2606 : }
2607 : else
2608 0 : elog(ERROR, "unrecognized kind of change: %d", kind);
2609 :
2610 32 : ResetPerTupleExprContext(chgcxt->cc_estate);
2611 : }
2612 :
2613 : /* Cleanup. */
2614 12 : ExecDropSingleTupleTableSlot(spilled_tuple);
2615 12 : ExecDropSingleTupleTableSlot(ondisk_tuple);
2616 12 : ExecDropSingleTupleTableSlot(old_update_tuple);
2617 :
2618 12 : MemoryContextSwitchTo(oldcxt);
2619 12 : }
2620 :
2621 : /*
2622 : * Apply an insert from the spill of concurrent changes to the new copy of the
2623 : * table.
2624 : */
2625 : static void
2626 7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
2627 : ChangeContext *chgcxt)
2628 : {
2629 : /* Put the tuple in the table, but make sure it won't be decoded */
2630 7 : table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2631 : TABLE_INSERT_NO_LOGICAL, NULL);
2632 :
2633 : /* Update indexes with this new tuple. */
2634 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2635 : chgcxt->cc_estate,
2636 : 0,
2637 : slot,
2638 : NIL, NULL);
2639 7 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
2640 7 : }
2641 :
2642 : /*
2643 : * Apply an update from the spill of concurrent changes to the new copy of the
2644 : * table.
2645 : */
2646 : static void
2647 22 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
2648 : TupleTableSlot *ondisk_tuple,
2649 : ChangeContext *chgcxt)
2650 : {
2651 : LockTupleMode lockmode;
2652 : TM_FailureData tmfd;
2653 : TU_UpdateIndexes update_indexes;
2654 : TM_Result res;
2655 :
2656 : /*
2657 : * Carry out the update, skipping logical decoding for it.
2658 : */
2659 22 : res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2660 : GetCurrentCommandId(true),
2661 : TABLE_UPDATE_NO_LOGICAL,
2662 : InvalidSnapshot,
2663 : InvalidSnapshot,
2664 : false,
2665 : &tmfd, &lockmode, &update_indexes);
2666 22 : if (res != TM_Ok)
2667 0 : ereport(ERROR,
2668 : errmsg("failed to apply concurrent UPDATE"));
2669 :
2670 22 : if (update_indexes != TU_None)
2671 : {
2672 8 : uint32 flags = EIIT_IS_UPDATE;
2673 :
2674 8 : if (update_indexes == TU_Summarizing)
2675 0 : flags |= EIIT_ONLY_SUMMARIZING;
2676 8 : ExecInsertIndexTuples(chgcxt->cc_rri,
2677 : chgcxt->cc_estate,
2678 : flags,
2679 : spilled_tuple,
2680 : NIL, NULL);
2681 : }
2682 :
2683 22 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
2684 22 : }
2685 :
2686 : static void
2687 3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
2688 : {
2689 : TM_Result res;
2690 : TM_FailureData tmfd;
2691 :
2692 : /*
2693 : * Delete tuple from the new heap, skipping logical decoding for it.
2694 : */
2695 3 : res = table_tuple_delete(rel, &(slot->tts_tid),
2696 : GetCurrentCommandId(true),
2697 : TABLE_DELETE_NO_LOGICAL,
2698 : InvalidSnapshot, InvalidSnapshot,
2699 : false,
2700 : &tmfd);
2701 :
2702 3 : if (res != TM_Ok)
2703 0 : ereport(ERROR,
2704 : errmsg("failed to apply concurrent DELETE"));
2705 :
2706 3 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
2707 3 : }
2708 :
2709 : /*
2710 : * Read tuple from file and put it in the input slot. All memory is allocated
2711 : * in the current memory context; caller is responsible for freeing it as
2712 : * appropriate.
2713 : *
2714 : * External attributes are stored in separate memory chunks, in order to avoid
2715 : * exceeding MaxAllocSize - that could happen if the individual attributes are
2716 : * smaller than MaxAllocSize but the whole tuple is bigger.
2717 : */
2718 : static void
2719 40 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
2720 : {
2721 : uint32 t_len;
2722 : HeapTuple tup;
2723 : int natt_ext;
2724 :
2725 : /* Read the tuple. */
2726 40 : BufFileReadExact(file, &t_len, sizeof(t_len));
2727 40 : tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2728 40 : tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2729 40 : BufFileReadExact(file, tup->t_data, t_len);
2730 40 : tup->t_len = t_len;
2731 40 : ItemPointerSetInvalid(&tup->t_self);
2732 40 : tup->t_tableOid = RelationGetRelid(relation);
2733 :
2734 : /*
2735 : * Put the tuple we read in a slot. This deforms it, so that we can hack
2736 : * the external attributes in place.
2737 : */
2738 40 : ExecForceStoreHeapTuple(tup, slot, false);
2739 :
2740 : /*
2741 : * Next, read any attributes we stored separately into the tts_values
2742 : * array elements expecting them, if any. This matches
2743 : * repack_store_change.
2744 : */
2745 40 : BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2746 40 : if (natt_ext > 0)
2747 : {
2748 11 : TupleDesc desc = slot->tts_tupleDescriptor;
2749 :
2750 66 : for (int i = 0; i < desc->natts; i++)
2751 : {
2752 55 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2753 : varlena *varlen;
2754 : uint64 chunk_header;
2755 : void *value;
2756 : Size varlensz;
2757 :
2758 55 : if (attr->attisdropped || attr->attlen != -1)
2759 40 : continue;
2760 22 : if (slot_attisnull(slot, i + 1))
2761 0 : continue;
2762 22 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
2763 22 : if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
2764 7 : continue;
2765 15 : slot_getsomeattrs(slot, i + 1);
2766 :
2767 15 : BufFileReadExact(file, &chunk_header, VARHDRSZ);
2768 15 : varlensz = VARSIZE_ANY(&chunk_header);
2769 :
2770 15 : value = palloc(varlensz);
2771 15 : memcpy(value, &chunk_header, VARHDRSZ);
2772 15 : BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2773 :
2774 15 : slot->tts_values[i] = PointerGetDatum(value);
2775 15 : natt_ext--;
2776 15 : if (natt_ext < 0)
2777 0 : ereport(ERROR,
2778 : errcode(ERRCODE_DATA_CORRUPTED),
2779 : errmsg("insufficient number of attributes stored separately"));
2780 : }
2781 : }
2782 40 : }
2783 :
2784 : /*
2785 : * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2786 : * corresponding ones from 'src'.
2787 : */
2788 : static void
2789 22 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
2790 : {
2791 22 : TupleDesc desc = dest->tts_tupleDescriptor;
2792 :
2793 104 : for (int i = 0; i < desc->natts; i++)
2794 : {
2795 82 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2796 : varlena *varlena_dst;
2797 :
2798 82 : if (attr->attisdropped)
2799 24 : continue;
2800 58 : if (attr->attlen != -1)
2801 28 : continue;
2802 30 : if (slot_attisnull(dest, i + 1))
2803 0 : continue;
2804 :
2805 30 : slot_getsomeattrs(dest, i + 1);
2806 :
2807 30 : varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2808 30 : if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
2809 28 : continue;
2810 2 : slot_getsomeattrs(src, i + 1);
2811 :
2812 2 : dest->tts_values[i] = src->tts_values[i];
2813 : }
2814 22 : }
2815 :
2816 : /*
2817 : * Find the tuple to be updated or deleted by the given data change, whose
2818 : * tuple has already been loaded into locator.
2819 : *
2820 : * If the tuple is found, put it in retrieved and return true. If the tuple is
2821 : * not found, return false.
2822 : */
2823 : static bool
2824 25 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
2825 : TupleTableSlot *retrieved)
2826 : {
2827 25 : Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2828 : IndexScanDesc scan;
2829 25 : bool retval = false;
2830 :
2831 : /*
2832 : * Scan key is passed by caller, so it does not have to be constructed
2833 : * multiple times. Key entries have all fields initialized, except for
2834 : * sk_argument.
2835 : *
2836 : * Use the incoming tuple to finalize the scan key.
2837 : */
2838 52 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2839 : {
2840 27 : ScanKey entry = &chgcxt->cc_ident_key[i];
2841 27 : AttrNumber attno = idx->indkey.values[i];
2842 :
2843 27 : entry->sk_argument = locator->tts_values[attno - 1];
2844 : Assert(!locator->tts_isnull[attno - 1]);
2845 : }
2846 :
2847 : /* XXX no instrumentation for now */
2848 25 : scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2849 : NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2850 25 : index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2851 26 : while (index_getnext_slot(scan, ForwardScanDirection, retrieved))
2852 : {
2853 : /* Be wary of temporal constraints */
2854 26 : if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
2855 : {
2856 1 : CHECK_FOR_INTERRUPTS();
2857 1 : continue;
2858 : }
2859 :
2860 25 : retval = true;
2861 25 : break;
2862 : }
2863 25 : index_endscan(scan);
2864 :
2865 25 : return retval;
2866 : }
2867 :
2868 : /*
2869 : * Check whether the candidate tuple matches the locator tuple on all replica
2870 : * identity key columns, using the same equality operators as the identity
2871 : * index scan. The locator tuple has already been loaded into cc_ident_key.
2872 : *
2873 : * This is needed to filter lossy index matches, such as GiST multirange scans
2874 : * used for temporal constraints.
2875 : */
2876 : static bool
2877 2 : identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator,
2878 : TupleTableSlot *candidate)
2879 : {
2880 2 : slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
2881 2 : slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
2882 :
2883 4 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2884 : {
2885 3 : ScanKey entry = &chgcxt->cc_ident_key[i];
2886 3 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
2887 :
2888 : Assert(attno > 0);
2889 :
2890 3 : if (locator->tts_isnull[attno - 1] != candidate->tts_isnull[attno - 1])
2891 0 : return false;
2892 :
2893 3 : if (locator->tts_isnull[attno - 1])
2894 0 : continue;
2895 :
2896 3 : if (!DatumGetBool(FunctionCall2Coll(&entry->sk_func,
2897 : entry->sk_collation,
2898 3 : candidate->tts_values[attno - 1],
2899 : entry->sk_argument)))
2900 1 : return false;
2901 : }
2902 :
2903 1 : return true;
2904 : }
2905 :
2906 : /*
2907 : * Decode and apply concurrent changes, up to (and including) the record whose
2908 : * LSN is 'end_of_wal'.
2909 : *
2910 : * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2911 : * are far too similar to each other.
2912 : */
2913 : static void
2914 12 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
2915 : {
2916 : DecodingWorkerShared *shared;
2917 : char fname[MAXPGPATH];
2918 : BufFile *file;
2919 :
2920 12 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2921 : PROGRESS_REPACK_PHASE_CATCH_UP);
2922 :
2923 : /* Ask the worker for the file. */
2924 12 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
2925 12 : SpinLockAcquire(&shared->mutex);
2926 12 : shared->lsn_upto = end_of_wal;
2927 12 : shared->done = done;
2928 12 : SpinLockRelease(&shared->mutex);
2929 :
2930 : /*
2931 : * The worker needs to finish processing of the current WAL record. Even
2932 : * if it's idle, it'll need to close the output file. Thus we're likely to
2933 : * wait, so prepare for sleep.
2934 : */
2935 12 : ConditionVariablePrepareToSleep(&shared->cv);
2936 : for (;;)
2937 12 : {
2938 : int last_exported;
2939 :
2940 24 : SpinLockAcquire(&shared->mutex);
2941 24 : last_exported = shared->last_exported;
2942 24 : SpinLockRelease(&shared->mutex);
2943 :
2944 : /*
2945 : * Has the worker exported the file we are waiting for?
2946 : */
2947 24 : if (last_exported == chgcxt->cc_file_seq)
2948 12 : break;
2949 :
2950 12 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
2951 : }
2952 12 : ConditionVariableCancelSleep();
2953 :
2954 : /* Open the file. */
2955 12 : DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
2956 12 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
2957 12 : apply_concurrent_changes(file, chgcxt);
2958 :
2959 12 : BufFileClose(file);
2960 :
2961 : /* Get ready for the next file. */
2962 12 : chgcxt->cc_file_seq++;
2963 12 : }
2964 :
2965 : /*
2966 : * Initialize the ChangeContext struct for the given relation, with
2967 : * the given index as identity index.
2968 : */
2969 : static void
2970 6 : initialize_change_context(ChangeContext *chgcxt,
2971 : Relation relation, Oid ident_index_id)
2972 : {
2973 6 : chgcxt->cc_rel = relation;
2974 :
2975 : /* Only initialize fields needed by ExecInsertIndexTuples(). */
2976 6 : chgcxt->cc_estate = CreateExecutorState();
2977 :
2978 6 : chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
2979 6 : InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
2980 6 : ExecOpenIndices(chgcxt->cc_rri, false);
2981 :
2982 : /*
2983 : * The table's relcache entry already has the relcache entry for the
2984 : * identity index; find that.
2985 : */
2986 6 : chgcxt->cc_ident_index = NULL;
2987 6 : for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
2988 : {
2989 : Relation ind_rel;
2990 :
2991 6 : ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
2992 6 : if (ind_rel->rd_id == ident_index_id)
2993 : {
2994 6 : chgcxt->cc_ident_index = ind_rel;
2995 6 : break;
2996 : }
2997 : }
2998 6 : if (chgcxt->cc_ident_index == NULL)
2999 0 : elog(ERROR, "failed to find identity index");
3000 :
3001 : /* Set up for scanning said identity index */
3002 : {
3003 : Form_pg_index indexForm;
3004 :
3005 6 : indexForm = chgcxt->cc_ident_index->rd_index;
3006 6 : chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
3007 6 : chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
3008 14 : for (int i = 0; i < indexForm->indnkeyatts; i++)
3009 : {
3010 : ScanKey entry;
3011 : Oid opfamily,
3012 : opcintype,
3013 : opno,
3014 : opcode;
3015 : StrategyNumber eq_strategy;
3016 :
3017 8 : entry = &chgcxt->cc_ident_key[i];
3018 :
3019 8 : opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
3020 8 : opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
3021 8 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ,
3022 8 : chgcxt->cc_ident_index->rd_rel->relam,
3023 : opfamily, false);
3024 8 : if (eq_strategy == InvalidStrategy)
3025 0 : elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
3026 : opfamily, opcintype);
3027 8 : opno = get_opfamily_member(opfamily, opcintype, opcintype,
3028 : eq_strategy);
3029 8 : if (!OidIsValid(opno))
3030 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
3031 : eq_strategy, opcintype, opcintype, opfamily);
3032 8 : opcode = get_opcode(opno);
3033 8 : if (!OidIsValid(opcode))
3034 0 : elog(ERROR, "missing oprcode for operator %u", opno);
3035 :
3036 : /* Initialize everything but argument. */
3037 8 : ScanKeyInit(entry,
3038 8 : i + 1,
3039 : eq_strategy, opcode,
3040 : (Datum) 0);
3041 8 : entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
3042 : }
3043 : }
3044 :
3045 : /* Determine the last column we must deform to read the identity */
3046 6 : chgcxt->cc_last_key_attno = InvalidAttrNumber;
3047 14 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
3048 : {
3049 8 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
3050 :
3051 : Assert(attno > 0);
3052 8 : chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
3053 : }
3054 :
3055 6 : chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
3056 6 : }
3057 :
3058 : /*
3059 : * Free up resources taken by a ChangeContext.
3060 : */
3061 : static void
3062 6 : release_change_context(ChangeContext *chgcxt)
3063 : {
3064 6 : ExecCloseIndices(chgcxt->cc_rri);
3065 6 : FreeExecutorState(chgcxt->cc_estate);
3066 : /* XXX are these pfrees necessary? */
3067 6 : pfree(chgcxt->cc_rri);
3068 6 : pfree(chgcxt->cc_ident_key);
3069 6 : }
3070 :
3071 : /*
3072 : * The final steps of rebuild_relation() for concurrent processing.
3073 : *
3074 : * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
3075 : * clustering index (if one is passed) are still locked in a mode that allows
3076 : * concurrent data changes. On exit, both tables and their indexes are closed,
3077 : * but locked in AccessExclusiveLock mode.
3078 : */
3079 : static void
3080 6 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
3081 : Oid identIdx, TransactionId frozenXid,
3082 : MultiXactId cutoffMulti)
3083 : {
3084 : List *ind_oids_new;
3085 6 : Oid old_table_oid = RelationGetRelid(OldHeap);
3086 6 : Oid new_table_oid = RelationGetRelid(NewHeap);
3087 6 : List *ind_oids_old = RelationGetIndexList(OldHeap);
3088 : ListCell *lc,
3089 : *lc2;
3090 : char relpersistence;
3091 : bool is_system_catalog;
3092 : Oid ident_idx_new;
3093 : XLogRecPtr end_of_wal;
3094 : List *indexrels;
3095 : ChangeContext chgcxt;
3096 :
3097 : Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
3098 : Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
3099 :
3100 : /*
3101 : * Unlike the exclusive case, we build new indexes for the new relation
3102 : * rather than swapping the storage and reindexing the old relation. The
3103 : * point is that the index build can take some time, so we do it before we
3104 : * get AccessExclusiveLock on the old heap and therefore we cannot swap
3105 : * the heap storage yet.
3106 : *
3107 : * index_create() will lock the new indexes using AccessExclusiveLock - no
3108 : * need to change that. At the same time, we use ShareUpdateExclusiveLock
3109 : * to lock the existing indexes - that should be enough to prevent others
3110 : * from changing them while we're repacking the relation. The lock on
3111 : * table should prevent others from changing the index column list, but
3112 : * might not be enough for commands like ALTER INDEX ... SET ... (Those
3113 : * are not necessarily dangerous, but can make user confused if the
3114 : * changes they do get lost due to REPACK.)
3115 : */
3116 6 : ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
3117 :
3118 : /*
3119 : * The identity index in the new relation appears in the same relative
3120 : * position as the corresponding index in the old relation. Find it.
3121 : */
3122 6 : ident_idx_new = InvalidOid;
3123 12 : foreach_oid(ind_old, ind_oids_old)
3124 : {
3125 6 : if (identIdx == ind_old)
3126 : {
3127 6 : int pos = foreach_current_index(ind_old);
3128 :
3129 6 : if (list_length(ind_oids_new) <= pos)
3130 0 : elog(ERROR, "list of new indexes too short");
3131 6 : ident_idx_new = list_nth_oid(ind_oids_new, pos);
3132 6 : break;
3133 : }
3134 : }
3135 6 : if (!OidIsValid(ident_idx_new))
3136 0 : elog(ERROR, "could not find index matching \"%s\" at the new relation",
3137 : get_rel_name(identIdx));
3138 :
3139 : /* Gather information to apply concurrent changes. */
3140 6 : initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
3141 :
3142 : /*
3143 : * During testing, wait for another backend to perform concurrent data
3144 : * changes which we will process below.
3145 : */
3146 6 : INJECTION_POINT("repack-concurrently-before-lock", NULL);
3147 :
3148 : /*
3149 : * Flush all WAL records inserted so far (possibly except for the last
3150 : * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3151 : * we need to flush while holding exclusive lock on the source table.
3152 : */
3153 6 : XLogFlush(GetXLogInsertEndRecPtr());
3154 6 : end_of_wal = GetFlushRecPtr(NULL);
3155 :
3156 : /*
3157 : * Apply concurrent changes first time, to minimize the time we need to
3158 : * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3159 : * written during the data copying and index creation.)
3160 : */
3161 6 : process_concurrent_changes(end_of_wal, &chgcxt, false);
3162 :
3163 : /*
3164 : * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3165 : * is one), all its indexes, so that we can swap the files.
3166 : */
3167 6 : LockRelationOid(old_table_oid, AccessExclusiveLock);
3168 :
3169 : /*
3170 : * Lock all indexes now, not only the clustering one: all indexes need to
3171 : * have their files swapped. While doing that, store their relation
3172 : * references in a zero-terminated array, to handle predicate locks below.
3173 : */
3174 6 : indexrels = NIL;
3175 19 : foreach_oid(ind_oid, ind_oids_old)
3176 : {
3177 : Relation index;
3178 :
3179 7 : index = index_open(ind_oid, AccessExclusiveLock);
3180 :
3181 : /*
3182 : * Some things about the index may have changed before we locked the
3183 : * index, such as ALTER INDEX RENAME. We don't need to do anything
3184 : * here to absorb those changes in the new index.
3185 : */
3186 7 : indexrels = lappend(indexrels, index);
3187 : }
3188 :
3189 : /*
3190 : * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3191 : * needed to swap the files.
3192 : */
3193 6 : if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3194 3 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3195 :
3196 : /*
3197 : * Tuples and pages of the old heap will be gone, but the heap will stay.
3198 : */
3199 6 : TransferPredicateLocksToHeapRelation(OldHeap);
3200 19 : foreach_ptr(RelationData, index, indexrels)
3201 : {
3202 7 : TransferPredicateLocksToHeapRelation(index);
3203 7 : index_close(index, NoLock);
3204 : }
3205 6 : list_free(indexrels);
3206 :
3207 : /*
3208 : * Flush WAL again, to make sure that all changes committed while we were
3209 : * waiting for the exclusive lock are available for decoding.
3210 : */
3211 6 : XLogFlush(GetXLogInsertEndRecPtr());
3212 6 : end_of_wal = GetFlushRecPtr(NULL);
3213 :
3214 : /*
3215 : * Apply the concurrent changes again. Indicate that the decoding worker
3216 : * won't be needed anymore.
3217 : */
3218 6 : process_concurrent_changes(end_of_wal, &chgcxt, true);
3219 :
3220 : /* Remember info about rel before closing OldHeap */
3221 6 : relpersistence = OldHeap->rd_rel->relpersistence;
3222 6 : is_system_catalog = IsSystemRelation(OldHeap);
3223 :
3224 6 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3225 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
3226 :
3227 : /*
3228 : * Even ShareUpdateExclusiveLock should have prevented others from
3229 : * creating / dropping indexes (even using the CONCURRENTLY option), so we
3230 : * do not need to check whether the lists match.
3231 : */
3232 13 : forboth(lc, ind_oids_old, lc2, ind_oids_new)
3233 : {
3234 7 : Oid ind_old = lfirst_oid(lc);
3235 7 : Oid ind_new = lfirst_oid(lc2);
3236 7 : Oid mapped_tables[4] = {0};
3237 :
3238 7 : swap_relation_files(ind_old, ind_new,
3239 : (old_table_oid == RelationRelationId),
3240 : false, /* swap_toast_by_content */
3241 : true,
3242 : InvalidTransactionId,
3243 : InvalidMultiXactId,
3244 : mapped_tables);
3245 :
3246 : #ifdef USE_ASSERT_CHECKING
3247 :
3248 : /*
3249 : * Concurrent processing is not supported for system relations, so
3250 : * there should be no mapped tables.
3251 : */
3252 : for (int i = 0; i < 4; i++)
3253 : Assert(!OidIsValid(mapped_tables[i]));
3254 : #endif
3255 : }
3256 :
3257 : /* The new indexes must be visible for deletion. */
3258 6 : CommandCounterIncrement();
3259 :
3260 : /* Close the old heap but keep lock until transaction commit. */
3261 6 : table_close(OldHeap, NoLock);
3262 : /* Close the new heap. (We didn't have to open its indexes). */
3263 6 : table_close(NewHeap, NoLock);
3264 :
3265 : /* Cleanup what we don't need anymore. (And close the identity index.) */
3266 6 : release_change_context(&chgcxt);
3267 :
3268 : /*
3269 : * Swap the relations and their TOAST relations and TOAST indexes. This
3270 : * also drops the new relation and its indexes.
3271 : *
3272 : * (System catalogs are currently not supported.)
3273 : */
3274 : Assert(!is_system_catalog);
3275 6 : finish_heap_swap(old_table_oid, new_table_oid,
3276 : is_system_catalog,
3277 : false, /* swap_toast_by_content */
3278 : false,
3279 : true,
3280 : false, /* reindex */
3281 : frozenXid, cutoffMulti,
3282 : relpersistence);
3283 6 : }
3284 :
3285 : /*
3286 : * Build indexes on NewHeap according to those on OldHeap.
3287 : *
3288 : * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3289 : * up locked using ShareUpdateExclusiveLock.
3290 : *
3291 : * A list of OIDs of the corresponding indexes created on NewHeap is
3292 : * returned. The order of items does match, so we can use these arrays to swap
3293 : * index storage.
3294 : */
3295 : static List *
3296 6 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
3297 : {
3298 6 : List *result = NIL;
3299 :
3300 6 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3301 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
3302 :
3303 19 : foreach_oid(oldindex, OldIndexes)
3304 : {
3305 : Oid newindex;
3306 : char *newName;
3307 : Relation ind;
3308 :
3309 7 : ind = index_open(oldindex, ShareUpdateExclusiveLock);
3310 :
3311 7 : newName = ChooseRelationName(get_rel_name(oldindex),
3312 : NULL,
3313 : "repacknew",
3314 7 : get_rel_namespace(ind->rd_index->indrelid),
3315 : false);
3316 7 : newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
3317 7 : oldindex, ind->rd_rel->reltablespace,
3318 : newName);
3319 7 : copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
3320 7 : result = lappend_oid(result, newindex);
3321 :
3322 7 : index_close(ind, NoLock);
3323 : }
3324 :
3325 6 : return result;
3326 : }
3327 :
3328 : /*
3329 : * Create a transient copy of a constraint -- supported by a transient
3330 : * copy of the index that supports the original constraint.
3331 : *
3332 : * When repacking a table that contains exclusion constraints, the executor
3333 : * relies on these constraints being properly catalogued. These copies are
3334 : * to support that.
3335 : *
3336 : * We don't need the constraints for anything else (the original constraints
3337 : * will be there once repack completes), so we add pg_depend entries so that
3338 : * the are dropped when the transient table is dropped.
3339 : */
3340 : static void
3341 7 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
3342 : {
3343 : ScanKeyData skey;
3344 : Relation rel;
3345 : TupleDesc desc;
3346 : SysScanDesc scan;
3347 : HeapTuple tup;
3348 : ObjectAddress objrel;
3349 :
3350 7 : rel = table_open(ConstraintRelationId, RowExclusiveLock);
3351 7 : ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
3352 :
3353 : /*
3354 : * Retrieve the constraints supported by the old index and create an
3355 : * identical one that points to the new index.
3356 : */
3357 7 : ScanKeyInit(&skey,
3358 : Anum_pg_constraint_conrelid,
3359 : BTEqualStrategyNumber, F_OIDEQ,
3360 7 : ObjectIdGetDatum(old_index->rd_index->indrelid));
3361 7 : scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
3362 : NULL, 1, &skey);
3363 7 : desc = RelationGetDescr(rel);
3364 23 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3365 : {
3366 16 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
3367 : Oid oid;
3368 16 : Datum values[Natts_pg_constraint] = {0};
3369 16 : bool nulls[Natts_pg_constraint] = {0};
3370 16 : bool replaces[Natts_pg_constraint] = {0};
3371 : HeapTuple new_tup;
3372 : ObjectAddress objcon;
3373 :
3374 16 : if (conform->conindid != RelationGetRelid(old_index))
3375 10 : continue;
3376 :
3377 6 : oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
3378 : Anum_pg_constraint_oid);
3379 6 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
3380 6 : replaces[Anum_pg_constraint_oid - 1] = true;
3381 6 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
3382 6 : replaces[Anum_pg_constraint_conrelid - 1] = true;
3383 6 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
3384 6 : replaces[Anum_pg_constraint_conindid - 1] = true;
3385 :
3386 6 : new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3387 :
3388 : /* Insert it into the catalog. */
3389 6 : CatalogTupleInsert(rel, new_tup);
3390 :
3391 : /* Create a dependency so it's removed when we drop the new heap. */
3392 6 : ObjectAddressSet(objcon, ConstraintRelationId, oid);
3393 6 : recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
3394 : }
3395 7 : systable_endscan(scan);
3396 :
3397 7 : table_close(rel, RowExclusiveLock);
3398 :
3399 7 : CommandCounterIncrement();
3400 7 : }
3401 :
3402 : /*
3403 : * Try to start a background worker to perform logical decoding of data
3404 : * changes applied to relation while REPACK CONCURRENTLY is copying its
3405 : * contents to a new table.
3406 : */
3407 : static void
3408 6 : start_repack_decoding_worker(Oid relid)
3409 : {
3410 : Size size;
3411 : dsm_segment *seg;
3412 : DecodingWorkerShared *shared;
3413 : shm_mq *mq;
3414 : shm_mq_handle *mqh;
3415 : BackgroundWorker bgw;
3416 :
3417 : /* Setup shared memory. */
3418 6 : size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3419 : BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
3420 6 : seg = dsm_create(size, 0);
3421 6 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
3422 6 : shared->initialized = false;
3423 6 : shared->lsn_upto = InvalidXLogRecPtr;
3424 6 : shared->done = false;
3425 6 : SharedFileSetInit(&shared->sfs, seg);
3426 6 : shared->last_exported = -1;
3427 6 : SpinLockInit(&shared->mutex);
3428 6 : shared->dbid = MyDatabaseId;
3429 :
3430 : /*
3431 : * This is the UserId set in cluster_rel(). Security context shouldn't be
3432 : * needed for decoding worker.
3433 : */
3434 6 : shared->roleid = GetUserId();
3435 6 : shared->relid = relid;
3436 6 : ConditionVariableInit(&shared->cv);
3437 6 : shared->backend_proc = MyProc;
3438 6 : shared->backend_pid = MyProcPid;
3439 6 : shared->backend_proc_number = MyProcNumber;
3440 :
3441 6 : mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3442 : REPACK_ERROR_QUEUE_SIZE);
3443 6 : shm_mq_set_receiver(mq, MyProc);
3444 6 : mqh = shm_mq_attach(mq, seg, NULL);
3445 :
3446 6 : memset(&bgw, 0, sizeof(bgw));
3447 6 : snprintf(bgw.bgw_name, BGW_MAXLEN,
3448 : "REPACK decoding worker for relation \"%s\"",
3449 : get_rel_name(relid));
3450 6 : snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3451 6 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3452 : BGWORKER_BACKEND_DATABASE_CONNECTION;
3453 6 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3454 6 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
3455 6 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3456 6 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3457 6 : bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
3458 6 : bgw.bgw_notify_pid = MyProcPid;
3459 :
3460 6 : decoding_worker = palloc0_object(DecodingWorker);
3461 6 : if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
3462 0 : ereport(ERROR,
3463 : errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
3464 : errmsg("out of background worker slots"),
3465 : errhint("You might need to increase \"%s\".", "max_worker_processes"));
3466 :
3467 6 : decoding_worker->seg = seg;
3468 6 : decoding_worker->error_mqh = mqh;
3469 :
3470 : /*
3471 : * The decoding setup must be done before the caller can have XID assigned
3472 : * for any reason, otherwise the worker might end up in a deadlock,
3473 : * waiting for the caller's transaction to end. Therefore wait here until
3474 : * the worker indicates that it has the logical decoding initialized.
3475 : */
3476 6 : ConditionVariablePrepareToSleep(&shared->cv);
3477 : for (;;)
3478 8 : {
3479 : bool initialized;
3480 :
3481 14 : SpinLockAcquire(&shared->mutex);
3482 14 : initialized = shared->initialized;
3483 14 : SpinLockRelease(&shared->mutex);
3484 :
3485 14 : if (initialized)
3486 6 : break;
3487 :
3488 8 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3489 : }
3490 6 : ConditionVariableCancelSleep();
3491 6 : }
3492 :
3493 : /*
3494 : * Stop the decoding worker and cleanup the related resources.
3495 : *
3496 : * The worker stops on its own when it knows there is no more work to do, but
3497 : * we need to stop it explicitly at least on ERROR in the launching backend.
3498 : */
3499 : static void
3500 6 : stop_repack_decoding_worker(void)
3501 : {
3502 : BgwHandleStatus status;
3503 :
3504 : /* Haven't reached the worker startup? */
3505 6 : if (decoding_worker == NULL)
3506 0 : return;
3507 :
3508 : /* Could not register the worker? */
3509 6 : if (decoding_worker->handle == NULL)
3510 0 : return;
3511 :
3512 6 : TerminateBackgroundWorker(decoding_worker->handle);
3513 : /* The worker should really exit before the REPACK command does. */
3514 6 : HOLD_INTERRUPTS();
3515 6 : status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
3516 6 : RESUME_INTERRUPTS();
3517 :
3518 6 : if (status == BGWH_POSTMASTER_DIED)
3519 0 : ereport(FATAL,
3520 : errcode(ERRCODE_ADMIN_SHUTDOWN),
3521 : errmsg("postmaster exited during REPACK command"));
3522 :
3523 6 : shm_mq_detach(decoding_worker->error_mqh);
3524 :
3525 : /*
3526 : * If we could not cancel the current sleep due to ERROR, do that before
3527 : * we detach from the shared memory the condition variable is located in.
3528 : * If we did not, the bgworker ERROR handling code would try and fail
3529 : * badly.
3530 : */
3531 6 : ConditionVariableCancelSleep();
3532 :
3533 6 : dsm_detach(decoding_worker->seg);
3534 6 : pfree(decoding_worker);
3535 6 : decoding_worker = NULL;
3536 : }
3537 :
3538 : /* stop_repack_decoding_worker, wrapped as a before_shmem_exit callback */
3539 : static void
3540 0 : stop_repack_decoding_worker_cb(int code, Datum arg)
3541 : {
3542 0 : stop_repack_decoding_worker();
3543 0 : }
3544 :
3545 : /*
3546 : * Get the initial snapshot from the decoding worker.
3547 : */
3548 : static Snapshot
3549 6 : get_initial_snapshot(DecodingWorker *worker)
3550 : {
3551 : DecodingWorkerShared *shared;
3552 : char fname[MAXPGPATH];
3553 : BufFile *file;
3554 : Size snap_size;
3555 : char *snap_space;
3556 : Snapshot snapshot;
3557 :
3558 6 : shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3559 :
3560 : /*
3561 : * The worker needs to initialize the logical decoding, which usually
3562 : * takes some time. Therefore it makes sense to prepare for the sleep
3563 : * first.
3564 : */
3565 6 : ConditionVariablePrepareToSleep(&shared->cv);
3566 : for (;;)
3567 3 : {
3568 : int last_exported;
3569 :
3570 9 : SpinLockAcquire(&shared->mutex);
3571 9 : last_exported = shared->last_exported;
3572 9 : SpinLockRelease(&shared->mutex);
3573 :
3574 : /*
3575 : * Has the worker exported the file we are waiting for?
3576 : */
3577 9 : if (last_exported == WORKER_FILE_SNAPSHOT)
3578 6 : break;
3579 :
3580 3 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3581 : }
3582 6 : ConditionVariableCancelSleep();
3583 :
3584 : /* Read the snapshot from a file. */
3585 6 : DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
3586 6 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3587 6 : BufFileReadExact(file, &snap_size, sizeof(snap_size));
3588 6 : snap_space = (char *) palloc(snap_size);
3589 6 : BufFileReadExact(file, snap_space, snap_size);
3590 6 : BufFileClose(file);
3591 :
3592 : /* Restore it. */
3593 6 : snapshot = RestoreSnapshot(snap_space);
3594 6 : pfree(snap_space);
3595 :
3596 6 : return snapshot;
3597 : }
3598 :
3599 : /*
3600 : * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3601 : * If relations of the same 'relid' happen to be processed at the same time,
3602 : * they must be from different databases and therefore different backends must
3603 : * be involved.
3604 : */
3605 : void
3606 36 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
3607 : {
3608 : /* The PID is already present in the fileset name, so we needn't add it */
3609 36 : snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3610 36 : }
3611 :
3612 : /*
3613 : * Handle receipt of an interrupt indicating a repack worker message.
3614 : *
3615 : * Note: this is called within a signal handler! All we can do is set
3616 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3617 : * ProcessRepackMessages().
3618 : */
3619 : void
3620 6 : HandleRepackMessageInterrupt(void)
3621 : {
3622 6 : InterruptPending = true;
3623 6 : RepackMessagePending = true;
3624 6 : SetLatch(MyLatch);
3625 6 : }
3626 :
3627 : /*
3628 : * Process any queued protocol messages received from the repack worker.
3629 : */
3630 : void
3631 6 : ProcessRepackMessages(void)
3632 : {
3633 : MemoryContext oldcontext;
3634 : static MemoryContext hpm_context = NULL;
3635 :
3636 : /*
3637 : * Nothing to do if we haven't launched the worker yet or have already
3638 : * terminated it.
3639 : */
3640 6 : if (decoding_worker == NULL)
3641 1 : return;
3642 :
3643 : /*
3644 : * This is invoked from ProcessInterrupts(), and since some of the
3645 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3646 : * for recursive calls if more signals are received while this runs. It's
3647 : * unclear that recursive entry would be safe, and it doesn't seem useful
3648 : * even if it is safe, so let's block interrupts until done.
3649 : */
3650 5 : HOLD_INTERRUPTS();
3651 :
3652 : /*
3653 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3654 : * don't want to risk leaking data into long-lived contexts, so let's do
3655 : * our work here in a private context that we can reset on each use.
3656 : */
3657 5 : if (hpm_context == NULL) /* first time through? */
3658 4 : hpm_context = AllocSetContextCreate(TopMemoryContext,
3659 : "ProcessRepackMessages",
3660 : ALLOCSET_DEFAULT_SIZES);
3661 : else
3662 1 : MemoryContextReset(hpm_context);
3663 :
3664 5 : oldcontext = MemoryContextSwitchTo(hpm_context);
3665 :
3666 : /* OK to process messages. Reset the flag saying there are more to do. */
3667 5 : RepackMessagePending = false;
3668 :
3669 : /*
3670 : * Read as many messages as we can from the worker, but stop when no more
3671 : * messages can be read from the worker without blocking.
3672 : */
3673 : while (true)
3674 0 : {
3675 : shm_mq_result res;
3676 : Size nbytes;
3677 : void *data;
3678 :
3679 5 : res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
3680 : &data, true);
3681 5 : if (res == SHM_MQ_WOULD_BLOCK)
3682 0 : break;
3683 5 : else if (res == SHM_MQ_SUCCESS)
3684 : {
3685 : StringInfoData msg;
3686 :
3687 0 : initStringInfo(&msg);
3688 0 : appendBinaryStringInfo(&msg, data, nbytes);
3689 0 : ProcessRepackMessage(&msg);
3690 0 : pfree(msg.data);
3691 : }
3692 : else
3693 : {
3694 : /*
3695 : * The decoding worker is special in that it exits as soon as it
3696 : * has its work done. Thus the DETACHED result code is fine.
3697 : */
3698 : Assert(res == SHM_MQ_DETACHED);
3699 :
3700 5 : break;
3701 : }
3702 : }
3703 :
3704 5 : MemoryContextSwitchTo(oldcontext);
3705 :
3706 : /* Might as well clear the context on our way out */
3707 5 : MemoryContextReset(hpm_context);
3708 :
3709 5 : RESUME_INTERRUPTS();
3710 : }
3711 :
3712 : /*
3713 : * Process a single protocol message received from a single parallel worker.
3714 : */
3715 : static void
3716 0 : ProcessRepackMessage(StringInfo msg)
3717 : {
3718 : char msgtype;
3719 :
3720 0 : msgtype = pq_getmsgbyte(msg);
3721 :
3722 0 : switch (msgtype)
3723 : {
3724 0 : case PqMsg_ErrorResponse:
3725 : case PqMsg_NoticeResponse:
3726 : {
3727 : ErrorData edata;
3728 :
3729 : /* Parse ErrorResponse or NoticeResponse. */
3730 0 : pq_parse_errornotice(msg, &edata);
3731 :
3732 : /* Death of a worker isn't enough justification for suicide. */
3733 0 : edata.elevel = Min(edata.elevel, ERROR);
3734 :
3735 : /*
3736 : * Add a context line to show that this is a message
3737 : * propagated from the worker. Otherwise, it can sometimes be
3738 : * confusing to understand what actually happened.
3739 : */
3740 0 : if (edata.context)
3741 0 : edata.context = psprintf("%s\n%s", edata.context,
3742 : _("REPACK decoding worker"));
3743 : else
3744 0 : edata.context = pstrdup(_("REPACK decoding worker"));
3745 :
3746 : /* Rethrow error or print notice. */
3747 0 : ThrowErrorData(&edata);
3748 :
3749 0 : break;
3750 : }
3751 :
3752 0 : default:
3753 : {
3754 0 : elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3755 : msgtype, msg->len);
3756 : }
3757 : }
3758 0 : }
|