Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack.c
4 : * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 : * parts of this code.
6 : *
7 : * There are two somewhat different ways to rewrite a table. In non-
8 : * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 : * transient relation, copy the tuples over to the relfilenode of the new
10 : * relation, swap the relfilenodes, then drop the old relation.
11 : *
12 : * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 : * then do an initial copy as above. However, while the tuples are being
14 : * copied, concurrent transactions could modify the table. To cope with those
15 : * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 : * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 : * from being reserved), and accumulates the changes in a file. Once the
18 : * initial copy is complete, we read the changes from the file and re-apply
19 : * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 : * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 : * a strong lock on the table is much reduced, and the bloat is eliminated.
22 : *
23 : *
24 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 : * Portions Copyright (c) 1994-5, Regents of the University of California
26 : *
27 : *
28 : * IDENTIFICATION
29 : * src/backend/commands/repack.c
30 : *
31 : *-------------------------------------------------------------------------
32 : */
33 : #include "postgres.h"
34 :
35 : #include "access/amapi.h"
36 : #include "access/heapam.h"
37 : #include "access/multixact.h"
38 : #include "access/relscan.h"
39 : #include "access/tableam.h"
40 : #include "access/toast_internals.h"
41 : #include "access/transam.h"
42 : #include "access/xact.h"
43 : #include "access/xlog.h"
44 : #include "catalog/catalog.h"
45 : #include "catalog/dependency.h"
46 : #include "catalog/heap.h"
47 : #include "catalog/index.h"
48 : #include "catalog/namespace.h"
49 : #include "catalog/objectaccess.h"
50 : #include "catalog/pg_am.h"
51 : #include "catalog/pg_constraint.h"
52 : #include "catalog/pg_inherits.h"
53 : #include "catalog/toasting.h"
54 : #include "commands/defrem.h"
55 : #include "commands/progress.h"
56 : #include "commands/repack.h"
57 : #include "commands/repack_internal.h"
58 : #include "commands/tablecmds.h"
59 : #include "commands/vacuum.h"
60 : #include "executor/executor.h"
61 : #include "libpq/pqformat.h"
62 : #include "libpq/pqmq.h"
63 : #include "miscadmin.h"
64 : #include "optimizer/optimizer.h"
65 : #include "pgstat.h"
66 : #include "replication/logicalrelation.h"
67 : #include "storage/bufmgr.h"
68 : #include "storage/ipc.h"
69 : #include "storage/lmgr.h"
70 : #include "storage/predicate.h"
71 : #include "storage/proc.h"
72 : #include "utils/acl.h"
73 : #include "utils/fmgroids.h"
74 : #include "utils/guc.h"
75 : #include "utils/injection_point.h"
76 : #include "utils/inval.h"
77 : #include "utils/lsyscache.h"
78 : #include "utils/memutils.h"
79 : #include "utils/pg_rusage.h"
80 : #include "utils/relmapper.h"
81 : #include "utils/snapmgr.h"
82 : #include "utils/syscache.h"
83 : #include "utils/wait_event_types.h"
84 :
85 : /*
86 : * This struct is used to pass around the information on tables to be
87 : * clustered. We need this so we can make a list of them when invoked without
88 : * a specific table/index pair.
89 : */
90 : typedef struct
91 : {
92 : Oid tableOid;
93 : Oid indexOid;
94 : } RelToCluster;
95 :
96 : /*
97 : * The first file exported by the decoding worker must contain a snapshot, the
98 : * following ones contain the data changes.
99 : */
100 : #define WORKER_FILE_SNAPSHOT 0
101 :
102 : /*
103 : * Information needed to apply concurrent data changes.
104 : */
105 : typedef struct ChangeContext
106 : {
107 : /* The relation the changes are applied to. */
108 : Relation cc_rel;
109 :
110 : /* Needed to update indexes of cc_rel. */
111 : ResultRelInfo *cc_rri;
112 : EState *cc_estate;
113 :
114 : /*
115 : * Existing tuples to UPDATE and DELETE are located via this index. We
116 : * keep the scankey in partially initialized state to avoid repeated work.
117 : * sk_argument is completed on the fly.
118 : */
119 : Relation cc_ident_index;
120 : ScanKey cc_ident_key;
121 : int cc_ident_key_nentries;
122 :
123 : /* The latest column we need to deform to have the tuple identity */
124 : AttrNumber cc_last_key_attno;
125 :
126 : /* Sequential number of the file containing the changes. */
127 : int cc_file_seq;
128 : } ChangeContext;
129 :
130 : /*
131 : * Backend-local information to control the decoding worker.
132 : */
133 : typedef struct DecodingWorker
134 : {
135 : /* The worker. */
136 : BackgroundWorkerHandle *handle;
137 :
138 : /* DecodingWorkerShared is in this segment. */
139 : dsm_segment *seg;
140 :
141 : /* Handle of the error queue. */
142 : shm_mq_handle *error_mqh;
143 : } DecodingWorker;
144 :
145 : /* Pointer to currently running decoding worker. */
146 : static DecodingWorker *decoding_worker = NULL;
147 :
148 : /*
149 : * Is there a message sent by a repack worker that the backend needs to
150 : * receive?
151 : */
152 : volatile sig_atomic_t RepackMessagePending = false;
153 :
154 : static LOCKMODE RepackLockLevel(bool concurrent);
155 : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
156 : Oid indexOid, Oid userid, LOCKMODE lmode,
157 : int options);
158 : static void check_concurrent_repack_requirements(Relation rel,
159 : Oid *ident_idx_p);
160 : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
161 : Oid ident_idx);
162 : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
163 : Snapshot snapshot,
164 : bool verbose,
165 : bool *pSwapToastByContent,
166 : TransactionId *pFreezeXid,
167 : MultiXactId *pCutoffMulti);
168 : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
169 : MemoryContext permcxt);
170 : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
171 : Oid relid, bool rel_is_index,
172 : MemoryContext permcxt);
173 : static bool repack_is_permitted_for_relation(RepackCommand cmd,
174 : Oid relid, Oid userid);
175 :
176 : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
177 : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
178 : ChangeContext *chgcxt);
179 : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
180 : TupleTableSlot *ondisk_tuple,
181 : ChangeContext *chgcxt);
182 : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
183 : static void restore_tuple(BufFile *file, Relation relation,
184 : TupleTableSlot *slot);
185 : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
186 : TupleTableSlot *src);
187 : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
188 : TupleTableSlot *locator,
189 : TupleTableSlot *retrieved);
190 : static bool identity_key_equal(ChangeContext *chgcxt,
191 : TupleTableSlot *locator,
192 : TupleTableSlot *candidate);
193 : static void process_concurrent_changes(XLogRecPtr end_of_wal,
194 : ChangeContext *chgcxt,
195 : bool done);
196 : static void initialize_change_context(ChangeContext *chgcxt,
197 : Relation relation,
198 : Oid ident_index_id);
199 : static void release_change_context(ChangeContext *chgcxt);
200 : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
201 : Oid identIdx,
202 : TransactionId frozenXid,
203 : MultiXactId cutoffMulti);
204 : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
205 : static void copy_index_constraints(Relation old_index, Oid new_index_id,
206 : Oid new_heap_id);
207 : static Relation process_single_relation(RepackStmt *stmt,
208 : LOCKMODE lockmode,
209 : bool isTopLevel,
210 : ClusterParams *params);
211 : static Oid determine_clustered_index(Relation rel, bool usingindex,
212 : const char *indexname);
213 :
214 : static void start_repack_decoding_worker(Oid relid);
215 : static void stop_repack_decoding_worker(void);
216 : static void stop_repack_decoding_worker_cb(int code, Datum arg);
217 : static Snapshot get_initial_snapshot(DecodingWorker *worker);
218 :
219 : static void ProcessRepackMessage(StringInfo msg);
220 : static const char *RepackCommandAsString(RepackCommand cmd);
221 :
222 :
223 : /*
224 : * The repack code allows for processing multiple tables at once. Because
225 : * of this, we cannot just run everything on a single transaction, or we
226 : * would be forced to acquire exclusive locks on all the tables being
227 : * clustered, simultaneously --- very likely leading to deadlock.
228 : *
229 : * To solve this we follow a similar strategy to VACUUM code, processing each
230 : * relation in a separate transaction. For this to work, we need to:
231 : *
232 : * - provide a separate memory context so that we can pass information in
233 : * a way that survives across transactions
234 : * - start a new transaction every time a new relation is clustered
235 : * - check for validity of the information on to-be-clustered relations,
236 : * as someone might have deleted a relation behind our back, or
237 : * clustered one on a different index
238 : * - end the transaction
239 : *
240 : * The single-relation case does not have any such overhead.
241 : *
242 : * We also allow a relation to be repacked following an index, but without
243 : * naming a specific one. In that case, the indisclustered bit will be
244 : * looked up, and an ERROR will be thrown if no so-marked index is found.
245 : */
246 : void
247 233 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
248 : {
249 233 : ClusterParams params = {0};
250 233 : Relation rel = NULL;
251 : MemoryContext repack_context;
252 : LOCKMODE lockmode;
253 : List *rtcs;
254 233 : bool verbose = false;
255 233 : bool analyze = false;
256 233 : bool concurrently = false;
257 :
258 : /* Parse option list */
259 496 : foreach_node(DefElem, opt, stmt->params)
260 : {
261 30 : if (strcmp(opt->defname, "verbose") == 0)
262 7 : verbose = defGetBoolean(opt);
263 23 : else if (strcmp(opt->defname, "analyze") == 0 ||
264 15 : strcmp(opt->defname, "analyse") == 0)
265 8 : analyze = defGetBoolean(opt);
266 15 : else if (strcmp(opt->defname, "concurrently") == 0)
267 : {
268 15 : if (stmt->command != REPACK_COMMAND_REPACK)
269 0 : ereport(ERROR,
270 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
271 : errmsg("CONCURRENTLY option not supported for %s",
272 : RepackCommandAsString(stmt->command)));
273 15 : concurrently = defGetBoolean(opt);
274 : }
275 : else
276 0 : ereport(ERROR,
277 : errcode(ERRCODE_SYNTAX_ERROR),
278 : errmsg("unrecognized %s option \"%s\"",
279 : RepackCommandAsString(stmt->command),
280 : opt->defname),
281 : parser_errposition(pstate, opt->location));
282 : }
283 :
284 466 : params.options |=
285 466 : (verbose ? CLUOPT_VERBOSE : 0) |
286 233 : (analyze ? CLUOPT_ANALYZE : 0) |
287 233 : (concurrently ? CLUOPT_CONCURRENT : 0);
288 :
289 : /* Determine the lock mode to use. */
290 233 : lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
291 :
292 233 : if ((params.options & CLUOPT_CONCURRENT) != 0)
293 : {
294 : /*
295 : * Make sure we're not in a transaction block.
296 : *
297 : * The reason is that repack_setup_logical_decoding() could wait
298 : * indefinitely for our XID to complete. (The deadlock detector would
299 : * not recognize it because we'd be waiting for ourselves, i.e. no
300 : * real lock conflict.) It would be possible to run in a transaction
301 : * block if we had no XID, but this restriction is simpler for users
302 : * to understand and we don't lose any functionality.
303 : */
304 15 : PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
305 : }
306 :
307 : /*
308 : * If a single relation is specified, process it and we're done ... unless
309 : * the relation is a partitioned table, in which case we fall through.
310 : */
311 233 : if (stmt->relation != NULL)
312 : {
313 217 : rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms);
314 173 : if (rel == NULL)
315 140 : return; /* all done */
316 : }
317 :
318 : /*
319 : * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
320 : * can add support for this later.
321 : */
322 49 : if (params.options & CLUOPT_ANALYZE)
323 0 : ereport(ERROR,
324 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
325 : errmsg("cannot execute %s on multiple tables",
326 : "REPACK (ANALYZE)"));
327 :
328 : /*
329 : * By here, we know we are in a multi-table situation.
330 : *
331 : * Concurrent processing is currently considered rather special (e.g. in
332 : * terms of resources consumed) so it is not performed in bulk.
333 : */
334 49 : if (params.options & CLUOPT_CONCURRENT)
335 : {
336 1 : if (rel != NULL)
337 : {
338 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
339 1 : ereport(ERROR,
340 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
341 : errmsg("%s is not supported for partitioned tables",
342 : "REPACK (CONCURRENTLY)"),
343 : errhint("Consider running the command on individual partitions."));
344 : }
345 : else
346 0 : ereport(ERROR,
347 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348 : errmsg("%s requires an explicit table name",
349 : "REPACK (CONCURRENTLY)"));
350 : }
351 :
352 : /*
353 : * In order to avoid holding locks for too long, we want to process each
354 : * table in its own transaction. This forces us to disallow running
355 : * inside a user transaction block.
356 : */
357 48 : PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
358 :
359 : /* Also, we need a memory context to hold our list of relations */
360 48 : repack_context = AllocSetContextCreate(PortalContext,
361 : "Repack",
362 : ALLOCSET_DEFAULT_SIZES);
363 :
364 : /*
365 : * Since we open a new transaction for each relation, we have to check
366 : * that the relation still is what we think it is.
367 : *
368 : * In single-transaction CLUSTER, we don't need the overhead.
369 : */
370 48 : params.options |= CLUOPT_RECHECK;
371 :
372 : /*
373 : * If we don't have a relation yet, determine a relation list. If we do,
374 : * then it must be a partitioned table, and we want to process its
375 : * partitions.
376 : */
377 48 : if (rel == NULL)
378 : {
379 : Assert(stmt->indexname == NULL);
380 16 : rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
381 : repack_context);
382 16 : params.options |= CLUOPT_RECHECK_ISCLUSTERED;
383 : }
384 : else
385 : {
386 : Oid relid;
387 : bool rel_is_index;
388 :
389 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
390 :
391 : /*
392 : * If USING INDEX was specified, resolve the index name now and pass
393 : * it down.
394 : */
395 32 : if (stmt->usingindex)
396 : {
397 : /*
398 : * If no index name was specified when repacking a partitioned
399 : * table, punt for now. Maybe we can improve this later.
400 : */
401 28 : if (!stmt->indexname)
402 : {
403 8 : if (stmt->command == REPACK_COMMAND_CLUSTER)
404 4 : ereport(ERROR,
405 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
406 : errmsg("there is no previously clustered index for table \"%s\"",
407 : RelationGetRelationName(rel)));
408 : else
409 4 : ereport(ERROR,
410 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
411 : /*- translator: first %s is name of a SQL command, eg. REPACK */
412 : errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
413 : RepackCommandAsString(stmt->command),
414 : RelationGetRelationName(rel)));
415 : }
416 :
417 20 : relid = determine_clustered_index(rel, stmt->usingindex,
418 20 : stmt->indexname);
419 20 : if (!OidIsValid(relid))
420 0 : elog(ERROR, "unable to determine index to cluster on");
421 20 : check_index_is_clusterable(rel, relid, AccessExclusiveLock);
422 :
423 16 : rel_is_index = true;
424 : }
425 : else
426 : {
427 4 : relid = RelationGetRelid(rel);
428 4 : rel_is_index = false;
429 : }
430 :
431 20 : rtcs = get_tables_to_repack_partitioned(stmt->command,
432 : relid, rel_is_index,
433 : repack_context);
434 :
435 : /* close parent relation, releasing lock on it */
436 20 : table_close(rel, AccessExclusiveLock);
437 20 : rel = NULL;
438 : }
439 :
440 : /* Commit to get out of starting transaction */
441 36 : PopActiveSnapshot();
442 36 : CommitTransactionCommand();
443 :
444 : /* Cluster the tables, each in a separate transaction */
445 : Assert(rel == NULL);
446 124 : foreach_ptr(RelToCluster, rtc, rtcs)
447 : {
448 : /* Start a new transaction for each relation. */
449 52 : StartTransactionCommand();
450 :
451 : /*
452 : * Open the target table, coping with the case where it has been
453 : * dropped.
454 : */
455 52 : rel = try_table_open(rtc->tableOid, lockmode);
456 52 : if (rel == NULL)
457 : {
458 0 : CommitTransactionCommand();
459 0 : continue;
460 : }
461 :
462 : /* functions in indexes may want a snapshot set */
463 52 : PushActiveSnapshot(GetTransactionSnapshot());
464 :
465 : /* Process this table */
466 52 : cluster_rel(stmt->command, rel, rtc->indexOid, ¶ms, isTopLevel);
467 : /* cluster_rel closes the relation, but keeps lock */
468 :
469 52 : PopActiveSnapshot();
470 52 : CommitTransactionCommand();
471 : }
472 :
473 : /* Start a new transaction for the cleanup work. */
474 36 : StartTransactionCommand();
475 :
476 : /* Clean up working storage */
477 36 : MemoryContextDelete(repack_context);
478 : }
479 :
480 : /*
481 : * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
482 : * operation to avoid any lock-upgrade hazards. In the concurrent case, we
483 : * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
484 : * processing and only acquire AccessExclusiveLock at the end, to swap the
485 : * relation -- supposedly for a short time.
486 : */
487 : static LOCKMODE
488 1072 : RepackLockLevel(bool concurrent)
489 : {
490 1072 : if (concurrent)
491 36 : return ShareUpdateExclusiveLock;
492 : else
493 1036 : return AccessExclusiveLock;
494 : }
495 :
496 : /*
497 : * cluster_rel
498 : *
499 : * This clusters the table by creating a new, clustered table and
500 : * swapping the relfilenumbers of the new table and the old table, so
501 : * the OID of the original table is preserved. Thus we do not lose
502 : * GRANT, inheritance nor references to this table.
503 : *
504 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
505 : * the new table, it's better to create the indexes afterwards than to fill
506 : * them incrementally while we load the table.
507 : *
508 : * If indexOid is InvalidOid, the table will be rewritten in physical order
509 : * instead of index order.
510 : *
511 : * Note that, in the concurrent case, the function releases the lock at some
512 : * point, in order to get AccessExclusiveLock for the final steps (i.e. to
513 : * swap the relation files). To make things simpler, the caller should expect
514 : * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
515 : * AccessExclusiveLock is kept till the end of the transaction.)
516 : *
517 : * 'cmd' indicates which command is being executed, to be used for error
518 : * messages.
519 : */
520 : void
521 429 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
522 : ClusterParams *params, bool isTopLevel)
523 : {
524 429 : Oid tableOid = RelationGetRelid(OldHeap);
525 : Relation index;
526 : LOCKMODE lmode;
527 : Oid save_userid;
528 : int save_sec_context;
529 : int save_nestlevel;
530 429 : bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
531 429 : bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
532 429 : bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
533 429 : Oid ident_idx = InvalidOid;
534 :
535 : /* Determine the lock mode to use. */
536 429 : lmode = RepackLockLevel(concurrent);
537 :
538 : /*
539 : * Check some preconditions in the concurrent case. This also obtains the
540 : * replica index OID.
541 : */
542 429 : if (concurrent)
543 14 : check_concurrent_repack_requirements(OldHeap, &ident_idx);
544 :
545 : /* Check for user-requested abort. */
546 422 : CHECK_FOR_INTERRUPTS();
547 :
548 422 : pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
549 422 : pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
550 :
551 : /*
552 : * Switch to the table owner's userid, so that any index functions are run
553 : * as that user. Also lock down security-restricted operations and
554 : * arrange to make GUC variable changes local to this command.
555 : */
556 422 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
557 422 : SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
558 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
559 422 : save_nestlevel = NewGUCNestLevel();
560 422 : RestrictSearchPath();
561 :
562 : /*
563 : * Recheck that the relation is still what it was when we started.
564 : *
565 : * Note that it's critical to skip this in single-relation CLUSTER;
566 : * otherwise, we would reject an attempt to cluster using a
567 : * not-previously-clustered index.
568 : */
569 422 : if (recheck &&
570 52 : !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
571 52 : lmode, params->options))
572 0 : goto out;
573 :
574 : /*
575 : * We allow repacking shared catalogs only when not using an index. It
576 : * would work to use an index in most respects, but the index would only
577 : * get marked as indisclustered in the current database, leading to
578 : * unexpected behavior if CLUSTER were later invoked in another database.
579 : */
580 422 : if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
581 0 : ereport(ERROR,
582 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
583 : /*- translator: first %s is name of a SQL command, eg. REPACK */
584 : errmsg("cannot execute %s on a shared catalog",
585 : RepackCommandAsString(cmd)));
586 :
587 : /*
588 : * The CONCURRENTLY case should have been rejected earlier because it does
589 : * not support system catalogs.
590 : */
591 : Assert(!(OldHeap->rd_rel->relisshared && concurrent));
592 :
593 : /*
594 : * Don't process temp tables of other backends ... their local buffer
595 : * manager is not going to cope.
596 : */
597 422 : if (RELATION_IS_OTHER_TEMP(OldHeap))
598 0 : ereport(ERROR,
599 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
600 : /*- translator: first %s is name of a SQL command, eg. REPACK */
601 : errmsg("cannot execute %s on temporary tables of other sessions",
602 : RepackCommandAsString(cmd)));
603 :
604 : /*
605 : * Also check for active uses of the relation in the current transaction,
606 : * including open scans and pending AFTER trigger events.
607 : */
608 422 : CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
609 :
610 : /* Check heap and index are valid to cluster on */
611 422 : if (OidIsValid(indexOid))
612 : {
613 : /* verify the index is good and lock it */
614 160 : check_index_is_clusterable(OldHeap, indexOid, lmode);
615 : /* also open it */
616 160 : index = index_open(indexOid, NoLock);
617 : }
618 : else
619 262 : index = NULL;
620 :
621 : /*
622 : * When allow_system_table_mods is turned off, we disallow repacking a
623 : * catalog on a particular index unless that's already the clustered index
624 : * for that catalog.
625 : *
626 : * XXX We don't check for this in CLUSTER, because it's historically been
627 : * allowed.
628 : */
629 422 : if (cmd != REPACK_COMMAND_CLUSTER &&
630 289 : !allowSystemTableMods && OidIsValid(indexOid) &&
631 27 : IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
632 4 : ereport(ERROR,
633 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
634 : errmsg("permission denied: \"%s\" is a system catalog",
635 : RelationGetRelationName(OldHeap)),
636 : errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
637 : "allow_system_table_mods"));
638 :
639 : /*
640 : * Quietly ignore the request if this is a materialized view which has not
641 : * been populated from its query. No harm is done because there is no data
642 : * to deal with, and we don't want to throw an error if this is part of a
643 : * multi-relation request -- for example, CLUSTER was run on the entire
644 : * database.
645 : */
646 418 : if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
647 8 : !RelationIsPopulated(OldHeap))
648 : {
649 8 : if (index)
650 8 : index_close(index, lmode);
651 8 : relation_close(OldHeap, lmode);
652 8 : goto out;
653 : }
654 :
655 : Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
656 : OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
657 : OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
658 :
659 : /*
660 : * All predicate locks on the tuples or pages are about to be made
661 : * invalid, because we move tuples around. Promote them to relation
662 : * locks. Predicate locks on indexes will be promoted when they are
663 : * reindexed.
664 : *
665 : * During concurrent processing, the heap as well as its indexes stay in
666 : * operation, so we postpone this step until they are locked using
667 : * AccessExclusiveLock near the end of the processing.
668 : */
669 410 : if (!concurrent)
670 403 : TransferPredicateLocksToHeapRelation(OldHeap);
671 :
672 : /*
673 : * rebuild_relation does all the dirty work, and closes OldHeap and index,
674 : * if valid.
675 : *
676 : * In concurrent mode, make sure the worker terminates; normally it does
677 : * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
678 : * happens even in case this backend dies early on a FATAL exit. Normal
679 : * mode doesn't need that overhead.
680 : */
681 410 : if (concurrent)
682 : {
683 7 : PG_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
684 : {
685 7 : rebuild_relation(OldHeap, index, verbose, ident_idx);
686 : }
687 7 : PG_END_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
688 7 : stop_repack_decoding_worker();
689 : }
690 : else
691 403 : rebuild_relation(OldHeap, index, verbose, ident_idx);
692 :
693 414 : out:
694 : /* Roll back any GUC changes executed by index functions */
695 414 : AtEOXact_GUC(false, save_nestlevel);
696 :
697 : /* Restore userid and security context */
698 414 : SetUserIdAndSecContext(save_userid, save_sec_context);
699 :
700 414 : pgstat_progress_end_command();
701 414 : }
702 :
703 : /*
704 : * Check if the table (and its index) still meets the requirements of
705 : * cluster_rel().
706 : */
707 : static bool
708 52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
709 : Oid userid, LOCKMODE lmode, int options)
710 : {
711 52 : Oid tableOid = RelationGetRelid(OldHeap);
712 :
713 : /* Check that the user still has privileges for the relation */
714 52 : if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
715 : {
716 0 : relation_close(OldHeap, lmode);
717 0 : return false;
718 : }
719 :
720 : /*
721 : * Silently skip a temp table for a remote session. Only doing this check
722 : * in the "recheck" case is appropriate (which currently means somebody is
723 : * executing a database-wide CLUSTER or on a partitioned table), because
724 : * there is another check in cluster() which will stop any attempt to
725 : * cluster remote temp tables by name. There is another check in
726 : * cluster_rel which is redundant, but we leave it for extra safety.
727 : */
728 52 : if (RELATION_IS_OTHER_TEMP(OldHeap))
729 : {
730 0 : relation_close(OldHeap, lmode);
731 0 : return false;
732 : }
733 :
734 52 : if (OidIsValid(indexOid))
735 : {
736 : /*
737 : * Check that the index still exists
738 : */
739 32 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
740 : {
741 0 : relation_close(OldHeap, lmode);
742 0 : return false;
743 : }
744 :
745 : /*
746 : * Check that the index is still the one with indisclustered set, if
747 : * needed.
748 : */
749 32 : if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
750 4 : !get_index_isclustered(indexOid))
751 : {
752 0 : relation_close(OldHeap, lmode);
753 0 : return false;
754 : }
755 : }
756 :
757 52 : return true;
758 : }
759 :
760 : /*
761 : * Verify that the specified heap and index are valid to cluster on
762 : *
763 : * Side effect: obtains lock on the index. The caller may
764 : * in some cases already have a lock of the same strength on the table, but
765 : * not in all cases so we can't rely on the table-level lock for
766 : * protection here.
767 : */
768 : void
769 359 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
770 : {
771 : Relation OldIndex;
772 :
773 359 : OldIndex = index_open(indexOid, lockmode);
774 :
775 : /*
776 : * Check that index is in fact an index on the given relation
777 : */
778 359 : if (OldIndex->rd_index == NULL ||
779 359 : OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
780 4 : ereport(ERROR,
781 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
782 : errmsg("\"%s\" is not an index for table \"%s\"",
783 : RelationGetRelationName(OldIndex),
784 : RelationGetRelationName(OldHeap))));
785 :
786 : /* Index AM must allow clustering */
787 355 : if (!OldIndex->rd_indam->amclusterable)
788 4 : ereport(ERROR,
789 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
790 : errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
791 : RelationGetRelationName(OldIndex))));
792 :
793 : /*
794 : * Disallow clustering on incomplete indexes (those that might not index
795 : * every row of the relation). We could relax this by making a separate
796 : * seqscan pass over the table to copy the missing rows, but that seems
797 : * expensive and tedious.
798 : */
799 351 : if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
800 4 : ereport(ERROR,
801 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
802 : errmsg("cannot cluster on partial index \"%s\"",
803 : RelationGetRelationName(OldIndex))));
804 :
805 : /*
806 : * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
807 : * it might well not contain entries for every heap row, or might not even
808 : * be internally consistent. (But note that we don't check indcheckxmin;
809 : * the worst consequence of following broken HOT chains would be that we
810 : * might put recently-dead tuples out-of-order in the new table, and there
811 : * is little harm in that.)
812 : */
813 347 : if (!OldIndex->rd_index->indisvalid)
814 4 : ereport(ERROR,
815 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816 : errmsg("cannot cluster on invalid index \"%s\"",
817 : RelationGetRelationName(OldIndex))));
818 :
819 : /* Drop relcache refcnt on OldIndex, but keep lock */
820 343 : index_close(OldIndex, NoLock);
821 343 : }
822 :
823 : /*
824 : * mark_index_clustered: mark the specified index as the one clustered on
825 : *
826 : * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
827 : */
828 : void
829 195 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
830 : {
831 : HeapTuple indexTuple;
832 : Form_pg_index indexForm;
833 : Relation pg_index;
834 : ListCell *index;
835 :
836 : Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
837 :
838 : /*
839 : * If the index is already marked clustered, no need to do anything.
840 : */
841 195 : if (OidIsValid(indexOid))
842 : {
843 187 : if (get_index_isclustered(indexOid))
844 40 : return;
845 : }
846 :
847 : /*
848 : * Check each index of the relation and set/clear the bit as needed.
849 : */
850 155 : pg_index = table_open(IndexRelationId, RowExclusiveLock);
851 :
852 464 : foreach(index, RelationGetIndexList(rel))
853 : {
854 309 : Oid thisIndexOid = lfirst_oid(index);
855 :
856 309 : indexTuple = SearchSysCacheCopy1(INDEXRELID,
857 : ObjectIdGetDatum(thisIndexOid));
858 309 : if (!HeapTupleIsValid(indexTuple))
859 0 : elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
860 309 : indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
861 :
862 : /*
863 : * Unset the bit if set. We know it's wrong because we checked this
864 : * earlier.
865 : */
866 309 : if (indexForm->indisclustered)
867 : {
868 20 : indexForm->indisclustered = false;
869 20 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
870 : }
871 289 : else if (thisIndexOid == indexOid)
872 : {
873 : /* this was checked earlier, but let's be real sure */
874 147 : if (!indexForm->indisvalid)
875 0 : elog(ERROR, "cannot cluster on invalid index %u", indexOid);
876 147 : indexForm->indisclustered = true;
877 147 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
878 : }
879 :
880 309 : InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
881 : InvalidOid, is_internal);
882 :
883 309 : heap_freetuple(indexTuple);
884 : }
885 :
886 155 : table_close(pg_index, RowExclusiveLock);
887 : }
888 :
889 : /*
890 : * Check if the CONCURRENTLY option is legal for the relation.
891 : *
892 : * *Ident_idx_p receives OID of the identity index.
893 : */
894 : static void
895 14 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
896 : {
897 : char relpersistence,
898 : replident;
899 : Oid ident_idx;
900 :
901 14 : if (wal_level < WAL_LEVEL_REPLICA)
902 0 : ereport(ERROR,
903 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
904 : errmsg("cannot execute %s in this configuration",
905 : "REPACK (CONCURRENTLY)"),
906 : errdetail("%s requires \"wal_level\" to be set to \"replica\" or higher.",
907 : "REPACK (CONCURRENTLY)"));
908 :
909 : /* Data changes in system relations are not logically decoded. */
910 14 : if (IsCatalogRelation(rel))
911 1 : ereport(ERROR,
912 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
913 : errmsg("cannot execute %s on relation \"%s\"",
914 : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
915 : errhint("%s is not supported for catalog relations.",
916 : "REPACK (CONCURRENTLY)"));
917 :
918 : /*
919 : * reorderbuffer.c does not seem to handle processing of TOAST relation
920 : * alone.
921 : */
922 13 : if (IsToastRelation(rel))
923 1 : ereport(ERROR,
924 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
925 : errmsg("cannot execute %s on relation \"%s\"",
926 : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
927 : errhint("%s is not supported for TOAST relations.",
928 : "REPACK (CONCURRENTLY)"));
929 :
930 12 : relpersistence = rel->rd_rel->relpersistence;
931 12 : if (relpersistence != RELPERSISTENCE_PERMANENT)
932 2 : ereport(ERROR,
933 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
934 : errmsg("cannot execute %s on relation \"%s\"",
935 : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
936 : errhint("%s is only allowed for permanent relations.",
937 : "REPACK (CONCURRENTLY)"));
938 :
939 : /*
940 : * With NOTHING, WAL does not contain the old tuple; FULL is not yet
941 : * supported.
942 : */
943 10 : replident = rel->rd_rel->relreplident;
944 10 : if (replident == REPLICA_IDENTITY_NOTHING ||
945 : replident == REPLICA_IDENTITY_FULL)
946 1 : ereport(ERROR,
947 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
948 : errmsg("cannot execute %s on relation \"%s\"",
949 : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
950 : errdetail("%s does not support tables with %s.",
951 : "REPACK (CONCURRENTLY)",
952 : replident == REPLICA_IDENTITY_NOTHING ?
953 : "REPLICA IDENTITY NOTHING" : "REPLICA IDENTITY FULL"));
954 :
955 : /*
956 : * Obtain the replica identity index -- either one that has been set
957 : * explicitly, or a non-deferrable primary key. If none of these cases
958 : * apply, the table cannot be repacked concurrently. It might be possible
959 : * to have repack work with a FULL replica identity; however that requires
960 : * more work and is not implemented yet.
961 : */
962 9 : ident_idx = GetRelationIdentityOrPK(rel);
963 9 : if (!OidIsValid(ident_idx))
964 : {
965 : /* This special case warrants its own error message */
966 2 : if (OidIsValid(rel->rd_pkindex) && rel->rd_ispkdeferrable)
967 1 : ereport(ERROR,
968 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
969 : errmsg("cannot execute %s on relation \"%s\"",
970 : "REPACK (CONCURRENTLY)",
971 : RelationGetRelationName(rel)),
972 : errdetail("%s does not support deferrable primary keys.",
973 : "REPACK (CONCURRENTLY)"),
974 : errhint("Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity."));
975 :
976 1 : ereport(ERROR,
977 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
978 : errmsg("cannot execute %s on relation \"%s\"",
979 : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
980 : errhint("Relation \"%s\" has no identity index.",
981 : RelationGetRelationName(rel)));
982 : }
983 :
984 7 : *ident_idx_p = ident_idx;
985 7 : }
986 :
987 :
988 : /*
989 : * rebuild_relation: rebuild an existing relation in index or physical order
990 : *
991 : * OldHeap: table to rebuild. See cluster_rel() for comments on the required
992 : * lock strength.
993 : *
994 : * index: index to cluster by, or NULL to rewrite in physical order.
995 : *
996 : * ident_idx: identity index, to handle replaying of concurrent data changes
997 : * to the new heap. InvalidOid if there's no CONCURRENTLY option.
998 : *
999 : * On entry, heap and index (if one is given) must be open, and the
1000 : * appropriate lock held on them -- AccessExclusiveLock for exclusive
1001 : * processing and ShareUpdateExclusiveLock for concurrent processing.
1002 : *
1003 : * On exit, they are closed, but still locked with AccessExclusiveLock.
1004 : * (The function handles the lock upgrade if 'concurrent' is true.)
1005 : */
1006 : static void
1007 410 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
1008 : Oid ident_idx)
1009 : {
1010 410 : Oid tableOid = RelationGetRelid(OldHeap);
1011 410 : Oid accessMethod = OldHeap->rd_rel->relam;
1012 410 : Oid tableSpace = OldHeap->rd_rel->reltablespace;
1013 : Oid OIDNewHeap;
1014 : Relation NewHeap;
1015 : char relpersistence;
1016 : bool swap_toast_by_content;
1017 : TransactionId frozenXid;
1018 : MultiXactId cutoffMulti;
1019 410 : bool concurrent = OidIsValid(ident_idx);
1020 410 : Snapshot snapshot = NULL;
1021 : #if USE_ASSERT_CHECKING
1022 : LOCKMODE lmode;
1023 :
1024 : lmode = RepackLockLevel(concurrent);
1025 :
1026 : Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
1027 : Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
1028 : #endif
1029 :
1030 410 : if (concurrent)
1031 : {
1032 : /*
1033 : * The worker needs to be member of the locking group we're the leader
1034 : * of. We ought to become the leader before the worker starts. The
1035 : * worker will join the group as soon as it starts.
1036 : *
1037 : * This is to make sure that the deadlock described below is
1038 : * detectable by deadlock.c: if the worker waits for a transaction to
1039 : * complete and we are waiting for the worker output, then effectively
1040 : * we (i.e. this backend) are waiting for that transaction.
1041 : */
1042 7 : BecomeLockGroupLeader();
1043 :
1044 : /*
1045 : * Start the worker that decodes data changes applied while we're
1046 : * copying the table contents.
1047 : *
1048 : * Note that the worker has to wait for all transactions with XID
1049 : * already assigned to finish. If some of those transactions is
1050 : * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1051 : * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1052 : * Not sure this risk is worth unlocking/locking the table (and its
1053 : * clustering index) and checking again if it's still eligible for
1054 : * REPACK CONCURRENTLY.
1055 : */
1056 7 : start_repack_decoding_worker(tableOid);
1057 :
1058 : /*
1059 : * Wait until the worker has the initial snapshot and retrieve it.
1060 : */
1061 7 : snapshot = get_initial_snapshot(decoding_worker);
1062 :
1063 7 : PushActiveSnapshot(snapshot);
1064 : }
1065 :
1066 : /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1067 410 : if (index != NULL)
1068 148 : mark_index_clustered(OldHeap, RelationGetRelid(index), true);
1069 :
1070 : /* Remember info about rel before closing OldHeap */
1071 410 : relpersistence = OldHeap->rd_rel->relpersistence;
1072 :
1073 : /*
1074 : * Create the transient table that will receive the re-ordered data.
1075 : *
1076 : * OldHeap is already locked, so no need to lock it again. make_new_heap
1077 : * obtains AccessExclusiveLock on the new heap and its toast table.
1078 : */
1079 410 : OIDNewHeap = make_new_heap(tableOid, tableSpace,
1080 : accessMethod,
1081 : relpersistence,
1082 : NoLock);
1083 : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
1084 410 : NewHeap = table_open(OIDNewHeap, NoLock);
1085 :
1086 : /* Copy the heap data into the new table in the desired order */
1087 410 : copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
1088 : &swap_toast_by_content, &frozenXid, &cutoffMulti);
1089 :
1090 : /* The historic snapshot won't be needed anymore. */
1091 410 : if (snapshot)
1092 : {
1093 7 : PopActiveSnapshot();
1094 7 : UpdateActiveSnapshotCommandId();
1095 : }
1096 :
1097 410 : if (concurrent)
1098 : {
1099 : Assert(!swap_toast_by_content);
1100 :
1101 : /*
1102 : * Close the index, but keep the lock. Both heaps will be closed by
1103 : * the following call.
1104 : */
1105 7 : if (index)
1106 3 : index_close(index, NoLock);
1107 :
1108 7 : rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
1109 : frozenXid, cutoffMulti);
1110 :
1111 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1112 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1113 : }
1114 : else
1115 : {
1116 403 : bool is_system_catalog = IsSystemRelation(OldHeap);
1117 :
1118 : /* Close relcache entries, but keep lock until transaction commit */
1119 403 : table_close(OldHeap, NoLock);
1120 403 : if (index)
1121 145 : index_close(index, NoLock);
1122 :
1123 : /*
1124 : * Close the new relation so it can be dropped as soon as the storage
1125 : * is swapped. The relation is not visible to others, so no need to
1126 : * unlock it explicitly.
1127 : */
1128 403 : table_close(NewHeap, NoLock);
1129 :
1130 : /*
1131 : * Swap the physical files of the target and transient tables, then
1132 : * rebuild the target's indexes and throw away the transient table.
1133 : */
1134 403 : finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
1135 : swap_toast_by_content, false, true,
1136 : true, /* reindex */
1137 : frozenXid, cutoffMulti,
1138 : relpersistence);
1139 : }
1140 406 : }
1141 :
1142 :
1143 : /*
1144 : * Create the transient table that will be filled with new data during
1145 : * CLUSTER, ALTER TABLE, and similar operations. The transient table
1146 : * duplicates the logical structure of the OldHeap; but will have the
1147 : * specified physical storage properties NewTableSpace, NewAccessMethod, and
1148 : * relpersistence.
1149 : *
1150 : * After this, the caller should load the new heap with transferred/modified
1151 : * data, then call finish_heap_swap to complete the operation.
1152 : */
1153 : Oid
1154 1559 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
1155 : char relpersistence, LOCKMODE lockmode)
1156 : {
1157 : TupleDesc OldHeapDesc;
1158 : char NewHeapName[NAMEDATALEN];
1159 : Oid OIDNewHeap;
1160 : Oid toastid;
1161 : Relation OldHeap;
1162 : HeapTuple tuple;
1163 : Datum reloptions;
1164 : bool isNull;
1165 : Oid namespaceid;
1166 :
1167 1559 : OldHeap = table_open(OIDOldHeap, lockmode);
1168 1559 : OldHeapDesc = RelationGetDescr(OldHeap);
1169 :
1170 : /*
1171 : * Note that the NewHeap will not receive any of the defaults or
1172 : * constraints associated with the OldHeap; we don't need 'em, and there's
1173 : * no reason to spend cycles inserting them into the catalogs only to
1174 : * delete them.
1175 : */
1176 :
1177 : /*
1178 : * But we do want to use reloptions of the old heap for new heap.
1179 : */
1180 1559 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1181 1559 : if (!HeapTupleIsValid(tuple))
1182 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1183 1559 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1184 : &isNull);
1185 1559 : if (isNull)
1186 1466 : reloptions = (Datum) 0;
1187 :
1188 1559 : if (relpersistence == RELPERSISTENCE_TEMP)
1189 98 : namespaceid = LookupCreationNamespace("pg_temp");
1190 : else
1191 1461 : namespaceid = RelationGetNamespace(OldHeap);
1192 :
1193 : /*
1194 : * Create the new heap, using a temporary name in the same namespace as
1195 : * the existing table. NOTE: there is some risk of collision with user
1196 : * relnames. Working around this seems more trouble than it's worth; in
1197 : * particular, we can't create the new heap in a different namespace from
1198 : * the old, or we will have problems with the TEMP status of temp tables.
1199 : *
1200 : * Note: the new heap is not a shared relation, even if we are rebuilding
1201 : * a shared rel. However, we do make the new heap mapped if the source is
1202 : * mapped. This simplifies swap_relation_files, and is absolutely
1203 : * necessary for rebuilding pg_class, for reasons explained there.
1204 : */
1205 1559 : snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1206 :
1207 1559 : OIDNewHeap = heap_create_with_catalog(NewHeapName,
1208 : namespaceid,
1209 : NewTableSpace,
1210 : InvalidOid,
1211 : InvalidOid,
1212 : InvalidOid,
1213 1559 : OldHeap->rd_rel->relowner,
1214 : NewAccessMethod,
1215 : OldHeapDesc,
1216 : NIL,
1217 : RELKIND_RELATION,
1218 : relpersistence,
1219 : false,
1220 1559 : RelationIsMapped(OldHeap),
1221 : ONCOMMIT_NOOP,
1222 : reloptions,
1223 : false,
1224 : true,
1225 : true,
1226 : OIDOldHeap,
1227 1559 : NULL);
1228 : Assert(OIDNewHeap != InvalidOid);
1229 :
1230 1559 : ReleaseSysCache(tuple);
1231 :
1232 : /*
1233 : * Advance command counter so that the newly-created relation's catalog
1234 : * tuples will be visible to table_open.
1235 : */
1236 1559 : CommandCounterIncrement();
1237 :
1238 : /*
1239 : * If necessary, create a TOAST table for the new relation.
1240 : *
1241 : * If the relation doesn't have a TOAST table already, we can't need one
1242 : * for the new relation. The other way around is possible though: if some
1243 : * wide columns have been dropped, NewHeapCreateToastTable can decide that
1244 : * no TOAST table is needed for the new table.
1245 : *
1246 : * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1247 : * that the TOAST table will be visible for insertion.
1248 : */
1249 1559 : toastid = OldHeap->rd_rel->reltoastrelid;
1250 1559 : if (OidIsValid(toastid))
1251 : {
1252 : /* keep the existing toast table's reloptions, if any */
1253 571 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
1254 571 : if (!HeapTupleIsValid(tuple))
1255 0 : elog(ERROR, "cache lookup failed for relation %u", toastid);
1256 571 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1257 : &isNull);
1258 571 : if (isNull)
1259 571 : reloptions = (Datum) 0;
1260 :
1261 571 : NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1262 :
1263 571 : ReleaseSysCache(tuple);
1264 : }
1265 :
1266 1559 : table_close(OldHeap, NoLock);
1267 :
1268 1559 : return OIDNewHeap;
1269 : }
1270 :
1271 : /*
1272 : * Do the physical copying of table data.
1273 : *
1274 : * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1275 : * iff concurrent processing is required.
1276 : *
1277 : * There are three output parameters:
1278 : * *pSwapToastByContent is set true if toast tables must be swapped by content.
1279 : * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1280 : * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1281 : */
1282 : static void
1283 410 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
1284 : Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1285 : TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
1286 : {
1287 : Relation relRelation;
1288 : HeapTuple reltup;
1289 : Form_pg_class relform;
1290 : TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
1291 : TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY;
1292 : VacuumParams params;
1293 : struct VacuumCutoffs cutoffs;
1294 : bool use_sort;
1295 410 : double num_tuples = 0,
1296 410 : tups_vacuumed = 0,
1297 410 : tups_recently_dead = 0;
1298 : BlockNumber num_pages;
1299 410 : int elevel = verbose ? INFO : DEBUG2;
1300 : PGRUsage ru0;
1301 : char *nspname;
1302 410 : bool concurrent = snapshot != NULL;
1303 : LOCKMODE lmode;
1304 :
1305 410 : lmode = RepackLockLevel(concurrent);
1306 :
1307 410 : pg_rusage_init(&ru0);
1308 :
1309 : /* Store a copy of the namespace name for logging purposes */
1310 410 : nspname = get_namespace_name(RelationGetNamespace(OldHeap));
1311 :
1312 : /*
1313 : * Their tuple descriptors should be exactly alike, but here we only need
1314 : * assume that they have the same number of columns.
1315 : */
1316 410 : oldTupDesc = RelationGetDescr(OldHeap);
1317 410 : newTupDesc = RelationGetDescr(NewHeap);
1318 : Assert(newTupDesc->natts == oldTupDesc->natts);
1319 :
1320 : /*
1321 : * If the OldHeap has a toast table, get lock on the toast table to keep
1322 : * it from being vacuumed. This is needed because autovacuum processes
1323 : * toast tables independently of their main tables, with no lock on the
1324 : * latter. If an autovacuum were to start on the toast table after we
1325 : * compute our OldestXmin below, it would use a later OldestXmin, and then
1326 : * possibly remove as DEAD toast tuples belonging to main tuples we think
1327 : * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1328 : * tuples.
1329 : *
1330 : * We don't need to open the toast relation here, just lock it. The lock
1331 : * will be held till end of transaction.
1332 : */
1333 410 : if (OldHeap->rd_rel->reltoastrelid)
1334 135 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1335 :
1336 : /*
1337 : * If both tables have TOAST tables, perform toast swap by content. It is
1338 : * possible that the old table has a toast table but the new one doesn't,
1339 : * if toastable columns have been dropped. In that case we have to do
1340 : * swap by links. This is okay because swap by content is only essential
1341 : * for system catalogs, and we don't support schema changes for them.
1342 : */
1343 410 : if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1344 135 : !concurrent)
1345 : {
1346 132 : *pSwapToastByContent = true;
1347 :
1348 : /*
1349 : * When doing swap by content, any toast pointers written into NewHeap
1350 : * must use the old toast table's OID, because that's where the toast
1351 : * data will eventually be found. Set this up by setting rd_toastoid.
1352 : * This also tells toast_save_datum() to preserve the toast value
1353 : * OIDs, which we want so as not to invalidate toast pointers in
1354 : * system catalog caches, and to avoid making multiple copies of a
1355 : * single toast value.
1356 : *
1357 : * Note that we must hold NewHeap open until we are done writing data,
1358 : * since the relcache will not guarantee to remember this setting once
1359 : * the relation is closed. Also, this technique depends on the fact
1360 : * that no one will try to read from the NewHeap until after we've
1361 : * finished writing it and swapping the rels --- otherwise they could
1362 : * follow the toast pointers to the wrong place. (It would actually
1363 : * work for values copied over from the old toast table, but not for
1364 : * any values that we toast which were previously not toasted.)
1365 : *
1366 : * This would not work with CONCURRENTLY because we may need to delete
1367 : * TOASTed tuples from the new heap. With this hack, we'd delete them
1368 : * from the old heap.
1369 : */
1370 132 : NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1371 : }
1372 : else
1373 278 : *pSwapToastByContent = false;
1374 :
1375 : /*
1376 : * Compute xids used to freeze and weed out dead tuples and multixacts.
1377 : * Since we're going to rewrite the whole table anyway, there's no reason
1378 : * not to be aggressive about this.
1379 : */
1380 410 : memset(¶ms, 0, sizeof(VacuumParams));
1381 410 : vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs);
1382 :
1383 : /*
1384 : * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1385 : * backwards, so take the max.
1386 : */
1387 : {
1388 410 : TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1389 :
1390 820 : if (TransactionIdIsValid(relfrozenxid) &&
1391 410 : TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
1392 10 : cutoffs.FreezeLimit = relfrozenxid;
1393 : }
1394 :
1395 : /*
1396 : * MultiXactCutoff, similarly, shouldn't go backwards either.
1397 : */
1398 : {
1399 410 : MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1400 :
1401 820 : if (MultiXactIdIsValid(relminmxid) &&
1402 410 : MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
1403 0 : cutoffs.MultiXactCutoff = relminmxid;
1404 : }
1405 :
1406 : /*
1407 : * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1408 : * the OldHeap. We know how to use a sort to duplicate the ordering of a
1409 : * btree index, and will use seqscan-and-sort for that case if the planner
1410 : * tells us it's cheaper. Otherwise, always indexscan if an index is
1411 : * provided, else plain seqscan.
1412 : */
1413 410 : if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1414 146 : use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
1415 : RelationGetRelid(OldIndex));
1416 : else
1417 264 : use_sort = false;
1418 :
1419 : /* Log what we're doing */
1420 410 : if (OldIndex != NULL && !use_sort)
1421 62 : ereport(elevel,
1422 : errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1423 : nspname,
1424 : RelationGetRelationName(OldHeap),
1425 : RelationGetRelationName(OldIndex)));
1426 348 : else if (use_sort)
1427 86 : ereport(elevel,
1428 : errmsg("repacking \"%s.%s\" using sequential scan and sort",
1429 : nspname,
1430 : RelationGetRelationName(OldHeap)));
1431 : else
1432 262 : ereport(elevel,
1433 : errmsg("repacking \"%s.%s\" in physical order",
1434 : nspname,
1435 : RelationGetRelationName(OldHeap)));
1436 :
1437 : /*
1438 : * Hand off the actual copying to AM specific function, the generic code
1439 : * cannot know how to deal with visibility across AMs. Note that this
1440 : * routine is allowed to set FreezeXid / MultiXactCutoff to different
1441 : * values (e.g. because the AM doesn't use freezing).
1442 : */
1443 410 : table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
1444 : cutoffs.OldestXmin, snapshot,
1445 : &cutoffs.FreezeLimit,
1446 : &cutoffs.MultiXactCutoff,
1447 : &num_tuples, &tups_vacuumed,
1448 : &tups_recently_dead);
1449 :
1450 : /* return selected values to caller, get set as relfrozenxid/minmxid */
1451 410 : *pFreezeXid = cutoffs.FreezeLimit;
1452 410 : *pCutoffMulti = cutoffs.MultiXactCutoff;
1453 :
1454 : /*
1455 : * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1456 : * In the CONCURRENTLY case, we need to set it again before applying the
1457 : * concurrent changes.
1458 : */
1459 410 : NewHeap->rd_toastoid = InvalidOid;
1460 :
1461 410 : num_pages = RelationGetNumberOfBlocks(NewHeap);
1462 :
1463 : /* Log what we did */
1464 410 : ereport(elevel,
1465 : (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1466 : nspname,
1467 : RelationGetRelationName(OldHeap),
1468 : tups_vacuumed, num_tuples,
1469 : RelationGetNumberOfBlocks(OldHeap)),
1470 : errdetail("%.0f dead row versions cannot be removed yet.\n"
1471 : "%s.",
1472 : tups_recently_dead,
1473 : pg_rusage_show(&ru0))));
1474 :
1475 : /* Update pg_class to reflect the correct values of pages and tuples. */
1476 410 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1477 :
1478 410 : reltup = SearchSysCacheCopy1(RELOID,
1479 : ObjectIdGetDatum(RelationGetRelid(NewHeap)));
1480 410 : if (!HeapTupleIsValid(reltup))
1481 0 : elog(ERROR, "cache lookup failed for relation %u",
1482 : RelationGetRelid(NewHeap));
1483 410 : relform = (Form_pg_class) GETSTRUCT(reltup);
1484 :
1485 410 : relform->relpages = num_pages;
1486 410 : relform->reltuples = num_tuples;
1487 :
1488 : /* Don't update the stats for pg_class. See swap_relation_files. */
1489 410 : if (RelationGetRelid(OldHeap) != RelationRelationId)
1490 387 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1491 : else
1492 23 : CacheInvalidateRelcacheByTuple(reltup);
1493 :
1494 : /* Clean up. */
1495 410 : heap_freetuple(reltup);
1496 410 : table_close(relRelation, RowExclusiveLock);
1497 :
1498 : /* Make the update visible */
1499 410 : CommandCounterIncrement();
1500 410 : }
1501 :
1502 : /*
1503 : * Swap the physical files of two given relations.
1504 : *
1505 : * We swap the physical identity (reltablespace, relfilenumber) while keeping
1506 : * the same logical identities of the two relations. relpersistence is also
1507 : * swapped, which is critical since it determines where buffers live for each
1508 : * relation.
1509 : *
1510 : * We can swap associated TOAST data in either of two ways: recursively swap
1511 : * the physical content of the toast tables (and their indexes), or swap the
1512 : * TOAST links in the given relations' pg_class entries. The former is needed
1513 : * to manage rewrites of shared catalogs (where we cannot change the pg_class
1514 : * links) while the latter is the only way to handle cases in which a toast
1515 : * table is added or removed altogether.
1516 : *
1517 : * Additionally, the first relation is marked with relfrozenxid set to
1518 : * frozenXid. It seems a bit ugly to have this here, but the caller would
1519 : * have to do it anyway, so having it here saves a heap_update. Note: in
1520 : * the swap-toast-links case, we assume we don't need to change the toast
1521 : * table's relfrozenxid: the new version of the toast table should already
1522 : * have relfrozenxid set to RecentXmin, which is good enough.
1523 : *
1524 : * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1525 : * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1526 : * having to look the information up again later in finish_heap_swap.
1527 : */
1528 : static void
1529 1710 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1530 : bool swap_toast_by_content,
1531 : bool is_internal,
1532 : TransactionId frozenXid,
1533 : MultiXactId cutoffMulti,
1534 : Oid *mapped_tables)
1535 : {
1536 : Relation relRelation;
1537 : HeapTuple reltup1,
1538 : reltup2;
1539 : Form_pg_class relform1,
1540 : relform2;
1541 : RelFileNumber relfilenumber1,
1542 : relfilenumber2;
1543 : RelFileNumber swaptemp;
1544 : char swptmpchr;
1545 : Oid relam1,
1546 : relam2;
1547 :
1548 : /* We need writable copies of both pg_class tuples. */
1549 1710 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1550 :
1551 1710 : reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1552 1710 : if (!HeapTupleIsValid(reltup1))
1553 0 : elog(ERROR, "cache lookup failed for relation %u", r1);
1554 1710 : relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1555 :
1556 1710 : reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1557 1710 : if (!HeapTupleIsValid(reltup2))
1558 0 : elog(ERROR, "cache lookup failed for relation %u", r2);
1559 1710 : relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1560 :
1561 1710 : relfilenumber1 = relform1->relfilenode;
1562 1710 : relfilenumber2 = relform2->relfilenode;
1563 1710 : relam1 = relform1->relam;
1564 1710 : relam2 = relform2->relam;
1565 :
1566 1710 : if (RelFileNumberIsValid(relfilenumber1) &&
1567 : RelFileNumberIsValid(relfilenumber2))
1568 : {
1569 : /*
1570 : * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1571 : * relpersistence
1572 : */
1573 : Assert(!target_is_pg_class);
1574 :
1575 1621 : swaptemp = relform1->relfilenode;
1576 1621 : relform1->relfilenode = relform2->relfilenode;
1577 1621 : relform2->relfilenode = swaptemp;
1578 :
1579 1621 : swaptemp = relform1->reltablespace;
1580 1621 : relform1->reltablespace = relform2->reltablespace;
1581 1621 : relform2->reltablespace = swaptemp;
1582 :
1583 1621 : swaptemp = relform1->relam;
1584 1621 : relform1->relam = relform2->relam;
1585 1621 : relform2->relam = swaptemp;
1586 :
1587 1621 : swptmpchr = relform1->relpersistence;
1588 1621 : relform1->relpersistence = relform2->relpersistence;
1589 1621 : relform2->relpersistence = swptmpchr;
1590 :
1591 : /* Also swap toast links, if we're swapping by links */
1592 1621 : if (!swap_toast_by_content)
1593 : {
1594 1285 : swaptemp = relform1->reltoastrelid;
1595 1285 : relform1->reltoastrelid = relform2->reltoastrelid;
1596 1285 : relform2->reltoastrelid = swaptemp;
1597 : }
1598 : }
1599 : else
1600 : {
1601 : /*
1602 : * Mapped-relation case. Here we have to swap the relation mappings
1603 : * instead of modifying the pg_class columns. Both must be mapped.
1604 : */
1605 89 : if (RelFileNumberIsValid(relfilenumber1) ||
1606 : RelFileNumberIsValid(relfilenumber2))
1607 0 : elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1608 : NameStr(relform1->relname));
1609 :
1610 : /*
1611 : * We can't change the tablespace nor persistence of a mapped rel, and
1612 : * we can't handle toast link swapping for one either, because we must
1613 : * not apply any critical changes to its pg_class row. These cases
1614 : * should be prevented by upstream permissions tests, so these checks
1615 : * are non-user-facing emergency backstop.
1616 : */
1617 89 : if (relform1->reltablespace != relform2->reltablespace)
1618 0 : elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1619 : NameStr(relform1->relname));
1620 89 : if (relform1->relpersistence != relform2->relpersistence)
1621 0 : elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1622 : NameStr(relform1->relname));
1623 89 : if (relform1->relam != relform2->relam)
1624 0 : elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1625 : NameStr(relform1->relname));
1626 89 : if (!swap_toast_by_content &&
1627 29 : (relform1->reltoastrelid || relform2->reltoastrelid))
1628 0 : elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1629 : NameStr(relform1->relname));
1630 :
1631 : /*
1632 : * Fetch the mappings --- shouldn't fail, but be paranoid
1633 : */
1634 89 : relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
1635 89 : if (!RelFileNumberIsValid(relfilenumber1))
1636 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1637 : NameStr(relform1->relname), r1);
1638 89 : relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
1639 89 : if (!RelFileNumberIsValid(relfilenumber2))
1640 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1641 : NameStr(relform2->relname), r2);
1642 :
1643 : /*
1644 : * Send replacement mappings to relmapper. Note these won't actually
1645 : * take effect until CommandCounterIncrement.
1646 : */
1647 89 : RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1648 89 : RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1649 :
1650 : /* Pass OIDs of mapped r2 tables back to caller */
1651 89 : *mapped_tables++ = r2;
1652 : }
1653 :
1654 : /*
1655 : * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1656 : * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1657 : * new.
1658 : */
1659 : {
1660 : Relation rel1,
1661 : rel2;
1662 :
1663 1710 : rel1 = relation_open(r1, NoLock);
1664 1710 : rel2 = relation_open(r2, NoLock);
1665 1710 : rel2->rd_createSubid = rel1->rd_createSubid;
1666 1710 : rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1667 1710 : rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1668 1710 : RelationAssumeNewRelfilelocator(rel1);
1669 1710 : relation_close(rel1, NoLock);
1670 1710 : relation_close(rel2, NoLock);
1671 : }
1672 :
1673 : /*
1674 : * In the case of a shared catalog, these next few steps will only affect
1675 : * our own database's pg_class row; but that's okay, because they are all
1676 : * noncritical updates. That's also an important fact for the case of a
1677 : * mapped catalog, because it's possible that we'll commit the map change
1678 : * and then fail to commit the pg_class update.
1679 : */
1680 :
1681 : /* set rel1's frozen Xid and minimum MultiXid */
1682 1710 : if (relform1->relkind != RELKIND_INDEX)
1683 : {
1684 : Assert(!TransactionIdIsValid(frozenXid) ||
1685 : TransactionIdIsNormal(frozenXid));
1686 1570 : relform1->relfrozenxid = frozenXid;
1687 1570 : relform1->relminmxid = cutoffMulti;
1688 : }
1689 :
1690 : /* swap size statistics too, since new rel has freshly-updated stats */
1691 : {
1692 : int32 swap_pages;
1693 : float4 swap_tuples;
1694 : int32 swap_allvisible;
1695 : int32 swap_allfrozen;
1696 :
1697 1710 : swap_pages = relform1->relpages;
1698 1710 : relform1->relpages = relform2->relpages;
1699 1710 : relform2->relpages = swap_pages;
1700 :
1701 1710 : swap_tuples = relform1->reltuples;
1702 1710 : relform1->reltuples = relform2->reltuples;
1703 1710 : relform2->reltuples = swap_tuples;
1704 :
1705 1710 : swap_allvisible = relform1->relallvisible;
1706 1710 : relform1->relallvisible = relform2->relallvisible;
1707 1710 : relform2->relallvisible = swap_allvisible;
1708 :
1709 1710 : swap_allfrozen = relform1->relallfrozen;
1710 1710 : relform1->relallfrozen = relform2->relallfrozen;
1711 1710 : relform2->relallfrozen = swap_allfrozen;
1712 : }
1713 :
1714 : /*
1715 : * Update the tuples in pg_class --- unless the target relation of the
1716 : * swap is pg_class itself. In that case, there is zero point in making
1717 : * changes because we'd be updating the old data that we're about to throw
1718 : * away. Because the real work being done here for a mapped relation is
1719 : * just to change the relation map settings, it's all right to not update
1720 : * the pg_class rows in this case. The most important changes will instead
1721 : * performed later, in finish_heap_swap() itself.
1722 : */
1723 1710 : if (!target_is_pg_class)
1724 : {
1725 : CatalogIndexState indstate;
1726 :
1727 1687 : indstate = CatalogOpenIndexes(relRelation);
1728 1687 : CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
1729 : indstate);
1730 1687 : CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
1731 : indstate);
1732 1687 : CatalogCloseIndexes(indstate);
1733 : }
1734 : else
1735 : {
1736 : /* no update ... but we do still need relcache inval */
1737 23 : CacheInvalidateRelcacheByTuple(reltup1);
1738 23 : CacheInvalidateRelcacheByTuple(reltup2);
1739 : }
1740 :
1741 : /*
1742 : * Now that pg_class has been updated with its relevant information for
1743 : * the swap, update the dependency of the relations to point to their new
1744 : * table AM, if it has changed.
1745 : */
1746 1710 : if (relam1 != relam2)
1747 : {
1748 24 : if (changeDependencyFor(RelationRelationId,
1749 : r1,
1750 : AccessMethodRelationId,
1751 : relam1,
1752 : relam2) != 1)
1753 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1754 : get_namespace_name(get_rel_namespace(r1)),
1755 : get_rel_name(r1));
1756 24 : if (changeDependencyFor(RelationRelationId,
1757 : r2,
1758 : AccessMethodRelationId,
1759 : relam2,
1760 : relam1) != 1)
1761 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1762 : get_namespace_name(get_rel_namespace(r2)),
1763 : get_rel_name(r2));
1764 : }
1765 :
1766 : /*
1767 : * Post alter hook for modified relations. The change to r2 is always
1768 : * internal, but r1 depends on the invocation context.
1769 : */
1770 1710 : InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
1771 : InvalidOid, is_internal);
1772 1710 : InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
1773 : InvalidOid, true);
1774 :
1775 : /*
1776 : * If we have toast tables associated with the relations being swapped,
1777 : * deal with them too.
1778 : */
1779 1710 : if (relform1->reltoastrelid || relform2->reltoastrelid)
1780 : {
1781 542 : if (swap_toast_by_content)
1782 : {
1783 132 : if (relform1->reltoastrelid && relform2->reltoastrelid)
1784 : {
1785 : /* Recursively swap the contents of the toast tables */
1786 132 : swap_relation_files(relform1->reltoastrelid,
1787 : relform2->reltoastrelid,
1788 : target_is_pg_class,
1789 : swap_toast_by_content,
1790 : is_internal,
1791 : frozenXid,
1792 : cutoffMulti,
1793 : mapped_tables);
1794 : }
1795 : else
1796 : {
1797 : /* caller messed up */
1798 0 : elog(ERROR, "cannot swap toast files by content when there's only one");
1799 : }
1800 : }
1801 : else
1802 : {
1803 : /*
1804 : * We swapped the ownership links, so we need to change dependency
1805 : * data to match.
1806 : *
1807 : * NOTE: it is possible that only one table has a toast table.
1808 : *
1809 : * NOTE: at present, a TOAST table's only dependency is the one on
1810 : * its owning table. If more are ever created, we'd need to use
1811 : * something more selective than deleteDependencyRecordsFor() to
1812 : * get rid of just the link we want.
1813 : */
1814 : ObjectAddress baseobject,
1815 : toastobject;
1816 : long count;
1817 :
1818 : /*
1819 : * We disallow this case for system catalogs, to avoid the
1820 : * possibility that the catalog we're rebuilding is one of the
1821 : * ones the dependency changes would change. It's too late to be
1822 : * making any data changes to the target catalog.
1823 : */
1824 410 : if (IsSystemClass(r1, relform1))
1825 0 : elog(ERROR, "cannot swap toast files by links for system catalogs");
1826 :
1827 : /* Delete old dependencies */
1828 410 : if (relform1->reltoastrelid)
1829 : {
1830 389 : count = deleteDependencyRecordsFor(RelationRelationId,
1831 : relform1->reltoastrelid,
1832 : false);
1833 389 : if (count != 1)
1834 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1835 : count);
1836 : }
1837 410 : if (relform2->reltoastrelid)
1838 : {
1839 410 : count = deleteDependencyRecordsFor(RelationRelationId,
1840 : relform2->reltoastrelid,
1841 : false);
1842 410 : if (count != 1)
1843 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1844 : count);
1845 : }
1846 :
1847 : /* Register new dependencies */
1848 410 : baseobject.classId = RelationRelationId;
1849 410 : baseobject.objectSubId = 0;
1850 410 : toastobject.classId = RelationRelationId;
1851 410 : toastobject.objectSubId = 0;
1852 :
1853 410 : if (relform1->reltoastrelid)
1854 : {
1855 389 : baseobject.objectId = r1;
1856 389 : toastobject.objectId = relform1->reltoastrelid;
1857 389 : recordDependencyOn(&toastobject, &baseobject,
1858 : DEPENDENCY_INTERNAL);
1859 : }
1860 :
1861 410 : if (relform2->reltoastrelid)
1862 : {
1863 410 : baseobject.objectId = r2;
1864 410 : toastobject.objectId = relform2->reltoastrelid;
1865 410 : recordDependencyOn(&toastobject, &baseobject,
1866 : DEPENDENCY_INTERNAL);
1867 : }
1868 : }
1869 : }
1870 :
1871 : /*
1872 : * If we're swapping two toast tables by content, do the same for their
1873 : * valid index. The swap can actually be safely done only if the relations
1874 : * have indexes.
1875 : */
1876 1710 : if (swap_toast_by_content &&
1877 396 : relform1->relkind == RELKIND_TOASTVALUE &&
1878 132 : relform2->relkind == RELKIND_TOASTVALUE)
1879 : {
1880 : Oid toastIndex1,
1881 : toastIndex2;
1882 :
1883 : /* Get valid index for each relation */
1884 132 : toastIndex1 = toast_get_valid_index(r1,
1885 : AccessExclusiveLock);
1886 132 : toastIndex2 = toast_get_valid_index(r2,
1887 : AccessExclusiveLock);
1888 :
1889 132 : swap_relation_files(toastIndex1,
1890 : toastIndex2,
1891 : target_is_pg_class,
1892 : swap_toast_by_content,
1893 : is_internal,
1894 : InvalidTransactionId,
1895 : InvalidMultiXactId,
1896 : mapped_tables);
1897 : }
1898 :
1899 : /* Clean up. */
1900 1710 : heap_freetuple(reltup1);
1901 1710 : heap_freetuple(reltup2);
1902 :
1903 1710 : table_close(relRelation, RowExclusiveLock);
1904 1710 : }
1905 :
1906 : /*
1907 : * Remove the transient table that was built by make_new_heap, and finish
1908 : * cleaning up (including rebuilding all indexes on the old heap).
1909 : */
1910 : void
1911 1438 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1912 : bool is_system_catalog,
1913 : bool swap_toast_by_content,
1914 : bool check_constraints,
1915 : bool is_internal,
1916 : bool reindex,
1917 : TransactionId frozenXid,
1918 : MultiXactId cutoffMulti,
1919 : char newrelpersistence)
1920 : {
1921 : ObjectAddress object;
1922 : Oid mapped_tables[4];
1923 : int i;
1924 :
1925 : /* Report that we are now swapping relation files */
1926 1438 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1927 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
1928 :
1929 : /* Zero out possible results from swapped_relation_files */
1930 1438 : memset(mapped_tables, 0, sizeof(mapped_tables));
1931 :
1932 : /*
1933 : * Swap the contents of the heap relations (including any toast tables).
1934 : * Also set old heap's relfrozenxid to frozenXid.
1935 : */
1936 1438 : swap_relation_files(OIDOldHeap, OIDNewHeap,
1937 : (OIDOldHeap == RelationRelationId),
1938 : swap_toast_by_content, is_internal,
1939 : frozenXid, cutoffMulti, mapped_tables);
1940 :
1941 : /*
1942 : * If it's a system catalog, queue a sinval message to flush all catcaches
1943 : * on the catalog when we reach CommandCounterIncrement.
1944 : */
1945 1438 : if (is_system_catalog)
1946 121 : CacheInvalidateCatalog(OIDOldHeap);
1947 :
1948 1438 : if (reindex)
1949 : {
1950 : int reindex_flags;
1951 1431 : ReindexParams reindex_params = {0};
1952 :
1953 : /*
1954 : * Rebuild each index on the relation (but not the toast table, which
1955 : * is all-new at this point). It is important to do this before the
1956 : * DROP step because if we are processing a system catalog that will
1957 : * be used during DROP, we want to have its indexes available. There
1958 : * is no advantage to the other order anyway because this is all
1959 : * transactional, so no chance to reclaim disk space before commit. We
1960 : * do not need a final CommandCounterIncrement() because
1961 : * reindex_relation does it.
1962 : *
1963 : * Note: because index_build is called via reindex_relation, it will
1964 : * never set indcheckxmin true for the indexes. This is OK even
1965 : * though in some sense we are building new indexes rather than
1966 : * rebuilding existing ones, because the new heap won't contain any
1967 : * HOT chains at all, let alone broken ones, so it can't be necessary
1968 : * to set indcheckxmin.
1969 : */
1970 1431 : reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1971 1431 : if (check_constraints)
1972 1028 : reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1973 :
1974 : /*
1975 : * Ensure that the indexes have the same persistence as the parent
1976 : * relation.
1977 : */
1978 1431 : if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1979 25 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
1980 1406 : else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1981 1353 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
1982 :
1983 : /* Report that we are now reindexing relations */
1984 1431 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1985 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
1986 :
1987 1431 : reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
1988 : }
1989 :
1990 : /* Report that we are now doing clean up */
1991 1426 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1992 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1993 :
1994 : /*
1995 : * If the relation being rebuilt is pg_class, swap_relation_files()
1996 : * couldn't update pg_class's own pg_class entry (check comments in
1997 : * swap_relation_files()), thus relfrozenxid was not updated. That's
1998 : * annoying because a potential reason for doing a VACUUM FULL is a
1999 : * imminent or actual anti-wraparound shutdown. So, now that we can
2000 : * access the new relation using its indices, update relfrozenxid.
2001 : * pg_class doesn't have a toast relation, so we don't need to update the
2002 : * corresponding toast relation. Not that there's little point moving all
2003 : * relfrozenxid updates here since swap_relation_files() needs to write to
2004 : * pg_class for non-mapped relations anyway.
2005 : */
2006 1426 : if (OIDOldHeap == RelationRelationId)
2007 : {
2008 : Relation relRelation;
2009 : HeapTuple reltup;
2010 : Form_pg_class relform;
2011 :
2012 23 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
2013 :
2014 23 : reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
2015 23 : if (!HeapTupleIsValid(reltup))
2016 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
2017 23 : relform = (Form_pg_class) GETSTRUCT(reltup);
2018 :
2019 23 : relform->relfrozenxid = frozenXid;
2020 23 : relform->relminmxid = cutoffMulti;
2021 :
2022 23 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
2023 :
2024 23 : table_close(relRelation, RowExclusiveLock);
2025 : }
2026 :
2027 : /* Destroy new heap with old filenumber */
2028 1426 : object.classId = RelationRelationId;
2029 1426 : object.objectId = OIDNewHeap;
2030 1426 : object.objectSubId = 0;
2031 :
2032 1426 : if (!reindex)
2033 : {
2034 : /*
2035 : * Make sure the changes in pg_class are visible. This is especially
2036 : * important if !swap_toast_by_content, so that the correct TOAST
2037 : * relation is dropped. (reindex_relation() above did not help in this
2038 : * case))
2039 : */
2040 7 : CommandCounterIncrement();
2041 : }
2042 :
2043 : /*
2044 : * The new relation is local to our transaction and we know nothing
2045 : * depends on it, so DROP_RESTRICT should be OK.
2046 : */
2047 1426 : performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
2048 :
2049 : /* performDeletion does CommandCounterIncrement at end */
2050 :
2051 : /*
2052 : * Now we must remove any relation mapping entries that we set up for the
2053 : * transient table, as well as its toast table and toast index if any. If
2054 : * we fail to do this before commit, the relmapper will complain about new
2055 : * permanent map entries being added post-bootstrap.
2056 : */
2057 1515 : for (i = 0; OidIsValid(mapped_tables[i]); i++)
2058 89 : RelationMapRemoveMapping(mapped_tables[i]);
2059 :
2060 : /*
2061 : * At this point, everything is kosher except that, if we did toast swap
2062 : * by links, the toast table's name corresponds to the transient table.
2063 : * The name is irrelevant to the backend because it's referenced by OID,
2064 : * but users looking at the catalogs could be confused. Rename it to
2065 : * prevent this problem.
2066 : *
2067 : * Note no lock required on the relation, because we already hold an
2068 : * exclusive lock on it.
2069 : */
2070 1426 : if (!swap_toast_by_content)
2071 : {
2072 : Relation newrel;
2073 :
2074 1294 : newrel = table_open(OIDOldHeap, NoLock);
2075 1294 : if (OidIsValid(newrel->rd_rel->reltoastrelid))
2076 : {
2077 : Oid toastidx;
2078 : char NewToastName[NAMEDATALEN];
2079 :
2080 : /* Get the associated valid index to be renamed */
2081 389 : toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2082 : AccessExclusiveLock);
2083 :
2084 : /* rename the toast table ... */
2085 389 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2086 : OIDOldHeap);
2087 389 : RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2088 : NewToastName, true, false);
2089 :
2090 : /* ... and its valid index too. */
2091 389 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2092 : OIDOldHeap);
2093 :
2094 389 : RenameRelationInternal(toastidx,
2095 : NewToastName, true, true);
2096 :
2097 : /*
2098 : * Reset the relrewrite for the toast. The command-counter
2099 : * increment is required here as we are about to update the tuple
2100 : * that is updated as part of RenameRelationInternal.
2101 : */
2102 389 : CommandCounterIncrement();
2103 389 : ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2104 : }
2105 1294 : relation_close(newrel, NoLock);
2106 : }
2107 :
2108 : /* if it's not a catalog table, clear any missing attribute settings */
2109 1426 : if (!is_system_catalog)
2110 : {
2111 : Relation newrel;
2112 :
2113 1305 : newrel = table_open(OIDOldHeap, NoLock);
2114 1305 : RelationClearMissing(newrel);
2115 1305 : relation_close(newrel, NoLock);
2116 : }
2117 1426 : }
2118 :
2119 : /*
2120 : * Determine which relations to process, when REPACK/CLUSTER is called
2121 : * without specifying a table name. The exact process depends on whether
2122 : * USING INDEX was given or not, and in any case we only return tables and
2123 : * materialized views that the current user has privileges to repack/cluster.
2124 : *
2125 : * If USING INDEX was given, we scan pg_index to find those that have
2126 : * indisclustered set; if it was not given, scan pg_class and return all
2127 : * tables.
2128 : *
2129 : * Return it as a list of RelToCluster in the given memory context.
2130 : */
2131 : static List *
2132 16 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
2133 : {
2134 : Relation catalog;
2135 : TableScanDesc scan;
2136 : HeapTuple tuple;
2137 16 : List *rtcs = NIL;
2138 :
2139 16 : if (usingindex)
2140 : {
2141 : ScanKeyData entry;
2142 :
2143 : /*
2144 : * For USING INDEX, scan pg_index to find those with indisclustered.
2145 : */
2146 12 : catalog = table_open(IndexRelationId, AccessShareLock);
2147 12 : ScanKeyInit(&entry,
2148 : Anum_pg_index_indisclustered,
2149 : BTEqualStrategyNumber, F_BOOLEQ,
2150 : BoolGetDatum(true));
2151 12 : scan = table_beginscan_catalog(catalog, 1, &entry);
2152 24 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2153 : {
2154 : RelToCluster *rtc;
2155 : Form_pg_index index;
2156 : HeapTuple classtup;
2157 : Form_pg_class classForm;
2158 : MemoryContext oldcxt;
2159 :
2160 12 : index = (Form_pg_index) GETSTRUCT(tuple);
2161 :
2162 : /*
2163 : * Try to obtain a light lock on the index's table, to ensure it
2164 : * doesn't go away while we collect the list. If we cannot, just
2165 : * disregard it. Be sure to release this if we ultimately decide
2166 : * not to process the table!
2167 : */
2168 12 : if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
2169 0 : continue;
2170 :
2171 : /* Verify that the table still exists; skip if not */
2172 12 : classtup = SearchSysCache1(RELOID, ObjectIdGetDatum(index->indrelid));
2173 12 : if (!HeapTupleIsValid(classtup))
2174 : {
2175 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2176 0 : continue;
2177 : }
2178 12 : classForm = (Form_pg_class) GETSTRUCT(classtup);
2179 :
2180 : /* Skip temp relations belonging to other sessions */
2181 12 : if (classForm->relpersistence == RELPERSISTENCE_TEMP &&
2182 0 : !isTempOrTempToastNamespace(classForm->relnamespace))
2183 : {
2184 0 : ReleaseSysCache(classtup);
2185 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2186 0 : continue;
2187 : }
2188 :
2189 12 : ReleaseSysCache(classtup);
2190 :
2191 : /* noisily skip rels which the user can't process */
2192 12 : if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2193 : GetUserId()))
2194 : {
2195 8 : UnlockRelationOid(index->indrelid, AccessShareLock);
2196 8 : continue;
2197 : }
2198 :
2199 : /* Use a permanent memory context for the result list */
2200 4 : oldcxt = MemoryContextSwitchTo(permcxt);
2201 4 : rtc = palloc_object(RelToCluster);
2202 4 : rtc->tableOid = index->indrelid;
2203 4 : rtc->indexOid = index->indexrelid;
2204 4 : rtcs = lappend(rtcs, rtc);
2205 4 : MemoryContextSwitchTo(oldcxt);
2206 : }
2207 : }
2208 : else
2209 : {
2210 4 : catalog = table_open(RelationRelationId, AccessShareLock);
2211 4 : scan = table_beginscan_catalog(catalog, 0, NULL);
2212 :
2213 9164 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2214 : {
2215 : RelToCluster *rtc;
2216 : Form_pg_class class;
2217 : MemoryContext oldcxt;
2218 :
2219 9160 : class = (Form_pg_class) GETSTRUCT(tuple);
2220 :
2221 : /*
2222 : * Try to obtain a light lock on the table, to ensure it doesn't
2223 : * go away while we collect the list. If we cannot, just
2224 : * disregard the table. Be sure to release this if we ultimately
2225 : * decide not to process the table!
2226 : */
2227 9160 : if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
2228 0 : continue;
2229 :
2230 : /* Verify that the table still exists */
2231 9160 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
2232 : {
2233 0 : UnlockRelationOid(class->oid, AccessShareLock);
2234 0 : continue;
2235 : }
2236 :
2237 : /* Can only process plain tables and matviews */
2238 9160 : if (class->relkind != RELKIND_RELATION &&
2239 6068 : class->relkind != RELKIND_MATVIEW)
2240 : {
2241 6036 : UnlockRelationOid(class->oid, AccessShareLock);
2242 6036 : continue;
2243 : }
2244 :
2245 : /* Skip temp relations belonging to other sessions */
2246 3124 : if (class->relpersistence == RELPERSISTENCE_TEMP &&
2247 16 : !isTempOrTempToastNamespace(class->relnamespace))
2248 : {
2249 0 : UnlockRelationOid(class->oid, AccessShareLock);
2250 0 : continue;
2251 : }
2252 :
2253 : /* noisily skip rels which the user can't process */
2254 3124 : if (!repack_is_permitted_for_relation(cmd, class->oid,
2255 : GetUserId()))
2256 : {
2257 3116 : UnlockRelationOid(class->oid, AccessShareLock);
2258 3116 : continue;
2259 : }
2260 :
2261 : /* Use a permanent memory context for the result list */
2262 8 : oldcxt = MemoryContextSwitchTo(permcxt);
2263 8 : rtc = palloc_object(RelToCluster);
2264 8 : rtc->tableOid = class->oid;
2265 8 : rtc->indexOid = InvalidOid;
2266 8 : rtcs = lappend(rtcs, rtc);
2267 8 : MemoryContextSwitchTo(oldcxt);
2268 : }
2269 : }
2270 :
2271 16 : table_endscan(scan);
2272 16 : relation_close(catalog, AccessShareLock);
2273 :
2274 16 : return rtcs;
2275 : }
2276 :
2277 : /*
2278 : * Given a partitioned table or its index, return a list of RelToCluster for
2279 : * all the leaf child tables/indexes.
2280 : *
2281 : * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2282 : * owning relation.
2283 : */
2284 : static List *
2285 20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
2286 : bool rel_is_index, MemoryContext permcxt)
2287 : {
2288 : List *inhoids;
2289 20 : List *rtcs = NIL;
2290 :
2291 : /*
2292 : * Do not lock the children until they're processed. Note that we do hold
2293 : * a lock on the parent partitioned table.
2294 : */
2295 20 : inhoids = find_all_inheritors(relid, NoLock, NULL);
2296 148 : foreach_oid(child_oid, inhoids)
2297 : {
2298 : Oid table_oid,
2299 : index_oid;
2300 : RelToCluster *rtc;
2301 : MemoryContext oldcxt;
2302 :
2303 108 : if (rel_is_index)
2304 : {
2305 : /* consider only leaf indexes */
2306 80 : if (get_rel_relkind(child_oid) != RELKIND_INDEX)
2307 40 : continue;
2308 :
2309 40 : table_oid = IndexGetRelation(child_oid, false);
2310 40 : index_oid = child_oid;
2311 : }
2312 : else
2313 : {
2314 : /* consider only leaf relations */
2315 28 : if (get_rel_relkind(child_oid) != RELKIND_RELATION)
2316 16 : continue;
2317 :
2318 12 : table_oid = child_oid;
2319 12 : index_oid = InvalidOid;
2320 : }
2321 :
2322 : /*
2323 : * It's possible that the user does not have privileges to CLUSTER the
2324 : * leaf partition despite having them on the partitioned table. Skip
2325 : * if so.
2326 : */
2327 52 : if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
2328 12 : continue;
2329 :
2330 : /* Use a permanent memory context for the result list */
2331 40 : oldcxt = MemoryContextSwitchTo(permcxt);
2332 40 : rtc = palloc_object(RelToCluster);
2333 40 : rtc->tableOid = table_oid;
2334 40 : rtc->indexOid = index_oid;
2335 40 : rtcs = lappend(rtcs, rtc);
2336 40 : MemoryContextSwitchTo(oldcxt);
2337 : }
2338 :
2339 20 : return rtcs;
2340 : }
2341 :
2342 :
2343 : /*
2344 : * Return whether userid has privileges to REPACK relid. If not, this
2345 : * function emits a WARNING.
2346 : */
2347 : static bool
2348 3240 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
2349 : {
2350 : Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
2351 :
2352 3240 : if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2353 104 : return true;
2354 :
2355 3136 : ereport(WARNING,
2356 : errmsg("permission denied to execute %s on \"%s\", skipping it",
2357 : RepackCommandAsString(cmd),
2358 : get_rel_name(relid)));
2359 :
2360 3136 : return false;
2361 : }
2362 :
2363 :
2364 : /*
2365 : * Given a RepackStmt with an indicated relation name, resolve the relation
2366 : * name, obtain lock on it, then determine what to do based on the relation
2367 : * type: if it's table and not partitioned, repack it as indicated (using an
2368 : * existing clustered index, or following the given one), and return NULL.
2369 : *
2370 : * On the other hand, if the table is partitioned, do nothing further and
2371 : * instead return the opened and locked relcache entry, so that caller can
2372 : * process the partitions using the multiple-table handling code. In this
2373 : * case, if an index name is given, it's up to the caller to resolve it.
2374 : */
2375 : static Relation
2376 217 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
2377 : ClusterParams *params)
2378 : {
2379 : Relation rel;
2380 : Oid tableOid;
2381 :
2382 : Assert(stmt->relation != NULL);
2383 : Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2384 : stmt->command == REPACK_COMMAND_REPACK);
2385 :
2386 : /*
2387 : * Make sure ANALYZE is specified if a column list is present.
2388 : */
2389 217 : if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2390 4 : ereport(ERROR,
2391 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2392 : errmsg("ANALYZE option must be specified when a column list is provided"));
2393 :
2394 : /* Find, lock, and check permissions on the table. */
2395 213 : tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2396 : lockmode,
2397 : 0,
2398 : RangeVarCallbackMaintainsTable,
2399 : NULL);
2400 205 : rel = table_open(tableOid, NoLock);
2401 :
2402 : /*
2403 : * Reject clustering a remote temp table ... their local buffer manager is
2404 : * not going to cope.
2405 : */
2406 205 : if (RELATION_IS_OTHER_TEMP(rel))
2407 1 : ereport(ERROR,
2408 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2409 : /*- translator: first %s is name of a SQL command, eg. REPACK */
2410 : errmsg("cannot execute %s on temporary tables of other sessions",
2411 : RepackCommandAsString(stmt->command)));
2412 :
2413 : /*
2414 : * For partitioned tables, let caller handle this. Otherwise, process it
2415 : * here and we're done.
2416 : */
2417 204 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2418 33 : return rel;
2419 : else
2420 : {
2421 171 : Oid indexOid = InvalidOid;
2422 :
2423 171 : indexOid = determine_clustered_index(rel, stmt->usingindex,
2424 171 : stmt->indexname);
2425 163 : if (OidIsValid(indexOid))
2426 140 : check_index_is_clusterable(rel, indexOid, lockmode);
2427 :
2428 151 : cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2429 :
2430 : /*
2431 : * Do an analyze, if requested. We close the transaction and start a
2432 : * new one, so that we don't hold the stronger lock for longer than
2433 : * needed.
2434 : */
2435 140 : if (params->options & CLUOPT_ANALYZE)
2436 : {
2437 8 : VacuumParams vac_params = {0};
2438 :
2439 8 : PopActiveSnapshot();
2440 8 : CommitTransactionCommand();
2441 :
2442 8 : StartTransactionCommand();
2443 8 : PushActiveSnapshot(GetTransactionSnapshot());
2444 :
2445 8 : vac_params.options |= VACOPT_ANALYZE;
2446 8 : if (params->options & CLUOPT_VERBOSE)
2447 0 : vac_params.options |= VACOPT_VERBOSE;
2448 8 : analyze_rel(tableOid, NULL, &vac_params,
2449 8 : stmt->relation->va_cols, true, NULL);
2450 8 : PopActiveSnapshot();
2451 8 : CommandCounterIncrement();
2452 : }
2453 :
2454 140 : return NULL;
2455 : }
2456 : }
2457 :
2458 : /*
2459 : * Given a relation and the usingindex/indexname options in a
2460 : * REPACK USING INDEX or CLUSTER command, return the OID of the
2461 : * index to use for clustering the table.
2462 : *
2463 : * Caller must hold lock on the relation so that the set of indexes
2464 : * doesn't change, and must call check_index_is_clusterable.
2465 : */
2466 : static Oid
2467 191 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2468 : {
2469 : Oid indexOid;
2470 :
2471 191 : if (indexname == NULL && usingindex)
2472 : {
2473 : /*
2474 : * If USING INDEX with no name is given, find a clustered index, or
2475 : * error out if none.
2476 : */
2477 21 : indexOid = InvalidOid;
2478 46 : foreach_oid(idxoid, RelationGetIndexList(rel))
2479 : {
2480 21 : if (get_index_isclustered(idxoid))
2481 : {
2482 17 : indexOid = idxoid;
2483 17 : break;
2484 : }
2485 : }
2486 :
2487 21 : if (!OidIsValid(indexOid))
2488 4 : ereport(ERROR,
2489 : errcode(ERRCODE_UNDEFINED_OBJECT),
2490 : errmsg("there is no previously clustered index for table \"%s\"",
2491 : RelationGetRelationName(rel)));
2492 : }
2493 170 : else if (indexname != NULL)
2494 : {
2495 : /* An index was specified; obtain its OID. */
2496 147 : indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2497 147 : if (!OidIsValid(indexOid))
2498 4 : ereport(ERROR,
2499 : errcode(ERRCODE_UNDEFINED_OBJECT),
2500 : errmsg("index \"%s\" for table \"%s\" does not exist",
2501 : indexname, RelationGetRelationName(rel)));
2502 : }
2503 : else
2504 23 : indexOid = InvalidOid;
2505 :
2506 183 : return indexOid;
2507 : }
2508 :
2509 : static const char *
2510 3611 : RepackCommandAsString(RepackCommand cmd)
2511 : {
2512 3611 : switch (cmd)
2513 : {
2514 3199 : case REPACK_COMMAND_REPACK:
2515 3199 : return "REPACK";
2516 226 : case REPACK_COMMAND_VACUUMFULL:
2517 226 : return "VACUUM";
2518 186 : case REPACK_COMMAND_CLUSTER:
2519 186 : return "CLUSTER";
2520 : }
2521 0 : return "???"; /* keep compiler quiet */
2522 : }
2523 :
2524 : /*
2525 : * Apply all the changes stored in 'file'.
2526 : */
2527 : static void
2528 14 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
2529 : {
2530 14 : ConcurrentChangeKind kind = '\0';
2531 14 : Relation rel = chgcxt->cc_rel;
2532 : TupleTableSlot *spilled_tuple;
2533 : TupleTableSlot *old_update_tuple;
2534 : TupleTableSlot *ondisk_tuple;
2535 14 : bool have_old_tuple = false;
2536 : MemoryContext oldcxt;
2537 :
2538 14 : spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2539 : &TTSOpsVirtual);
2540 14 : ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2541 : table_slot_callbacks(rel));
2542 14 : old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2543 : &TTSOpsVirtual);
2544 :
2545 14 : oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
2546 :
2547 : while (true)
2548 40 : {
2549 : size_t nread;
2550 54 : ConcurrentChangeKind prevkind = kind;
2551 :
2552 54 : CHECK_FOR_INTERRUPTS();
2553 :
2554 54 : nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2555 54 : if (nread == 0) /* done with the file? */
2556 14 : break;
2557 :
2558 : /*
2559 : * If this is the old tuple for an update, read it into the tuple slot
2560 : * and go to the next one. The update itself will be executed on the
2561 : * next iteration, when we receive the NEW tuple.
2562 : */
2563 40 : if (kind == CHANGE_UPDATE_OLD)
2564 : {
2565 8 : restore_tuple(file, rel, old_update_tuple);
2566 8 : have_old_tuple = true;
2567 8 : continue;
2568 : }
2569 :
2570 : /*
2571 : * Just before an UPDATE or DELETE, we must update the command
2572 : * counter, because the change could refer to a tuple that we have
2573 : * just inserted; and before an INSERT, we have to do this also if the
2574 : * previous command was either update or delete.
2575 : *
2576 : * With this approach we don't spend so many CCIs for long strings of
2577 : * only INSERTs, which can't affect one another.
2578 : */
2579 32 : if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2580 7 : (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
2581 : prevkind == CHANGE_DELETE)))
2582 : {
2583 29 : CommandCounterIncrement();
2584 29 : UpdateActiveSnapshotCommandId();
2585 : }
2586 :
2587 : /*
2588 : * Now restore the tuple into the slot and execute the change.
2589 : */
2590 32 : restore_tuple(file, rel, spilled_tuple);
2591 :
2592 32 : if (kind == CHANGE_INSERT)
2593 : {
2594 7 : apply_concurrent_insert(rel, spilled_tuple, chgcxt);
2595 : }
2596 25 : else if (kind == CHANGE_DELETE)
2597 : {
2598 : bool found;
2599 :
2600 : /* Find the tuple to be deleted */
2601 3 : found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
2602 3 : if (!found)
2603 0 : elog(ERROR, "could not find target tuple");
2604 3 : apply_concurrent_delete(rel, ondisk_tuple);
2605 : }
2606 22 : else if (kind == CHANGE_UPDATE_NEW)
2607 : {
2608 : TupleTableSlot *key;
2609 : bool found;
2610 :
2611 22 : if (have_old_tuple)
2612 8 : key = old_update_tuple;
2613 : else
2614 14 : key = spilled_tuple;
2615 :
2616 : /* Find the tuple to be updated or deleted. */
2617 22 : found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2618 22 : if (!found)
2619 0 : elog(ERROR, "could not find target tuple");
2620 :
2621 : /*
2622 : * If 'tup' contains TOAST pointers, they point to the old
2623 : * relation's toast. Copy the corresponding TOAST pointers for the
2624 : * new relation from the existing tuple. (The fact that we
2625 : * received a TOAST pointer here implies that the attribute hasn't
2626 : * changed.)
2627 : */
2628 22 : adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
2629 :
2630 22 : apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
2631 :
2632 22 : ExecClearTuple(old_update_tuple);
2633 22 : have_old_tuple = false;
2634 : }
2635 : else
2636 0 : elog(ERROR, "unrecognized kind of change: %d", kind);
2637 :
2638 32 : ResetPerTupleExprContext(chgcxt->cc_estate);
2639 : }
2640 :
2641 : /* Cleanup. */
2642 14 : ExecDropSingleTupleTableSlot(spilled_tuple);
2643 14 : ExecDropSingleTupleTableSlot(ondisk_tuple);
2644 14 : ExecDropSingleTupleTableSlot(old_update_tuple);
2645 :
2646 14 : MemoryContextSwitchTo(oldcxt);
2647 14 : }
2648 :
2649 : /*
2650 : * Apply an insert from the spill of concurrent changes to the new copy of the
2651 : * table.
2652 : */
2653 : static void
2654 7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
2655 : ChangeContext *chgcxt)
2656 : {
2657 : /* Put the tuple in the table, but make sure it won't be decoded */
2658 7 : table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2659 : TABLE_INSERT_NO_LOGICAL, NULL);
2660 :
2661 : /* Update indexes with this new tuple. */
2662 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2663 : chgcxt->cc_estate,
2664 : 0,
2665 : slot,
2666 : NIL, NULL);
2667 7 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
2668 7 : }
2669 :
2670 : /*
2671 : * Apply an update from the spill of concurrent changes to the new copy of the
2672 : * table.
2673 : */
2674 : static void
2675 22 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
2676 : TupleTableSlot *ondisk_tuple,
2677 : ChangeContext *chgcxt)
2678 : {
2679 : LockTupleMode lockmode;
2680 : TM_FailureData tmfd;
2681 : TU_UpdateIndexes update_indexes;
2682 : TM_Result res;
2683 :
2684 : /*
2685 : * Carry out the update, skipping logical decoding for it.
2686 : */
2687 22 : res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2688 : GetCurrentCommandId(true),
2689 : TABLE_UPDATE_NO_LOGICAL,
2690 : InvalidSnapshot,
2691 : InvalidSnapshot,
2692 : false,
2693 : &tmfd, &lockmode, &update_indexes);
2694 22 : if (res != TM_Ok)
2695 0 : ereport(ERROR,
2696 : errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
2697 : errmsg("could not apply concurrent %s on relation \"%s\"",
2698 : "UPDATE", RelationGetRelationName(rel)));
2699 :
2700 22 : if (update_indexes != TU_None)
2701 : {
2702 8 : uint32 flags = EIIT_IS_UPDATE;
2703 :
2704 8 : if (update_indexes == TU_Summarizing)
2705 0 : flags |= EIIT_ONLY_SUMMARIZING;
2706 8 : ExecInsertIndexTuples(chgcxt->cc_rri,
2707 : chgcxt->cc_estate,
2708 : flags,
2709 : spilled_tuple,
2710 : NIL, NULL);
2711 : }
2712 :
2713 22 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
2714 22 : }
2715 :
2716 : static void
2717 3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
2718 : {
2719 : TM_Result res;
2720 : TM_FailureData tmfd;
2721 :
2722 : /*
2723 : * Delete tuple from the new heap, skipping logical decoding for it.
2724 : */
2725 3 : res = table_tuple_delete(rel, &(slot->tts_tid),
2726 : GetCurrentCommandId(true),
2727 : TABLE_DELETE_NO_LOGICAL,
2728 : InvalidSnapshot, InvalidSnapshot,
2729 : false,
2730 : &tmfd);
2731 :
2732 3 : if (res != TM_Ok)
2733 0 : ereport(ERROR,
2734 : errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
2735 : errmsg("could not apply concurrent %s on relation \"%s\"",
2736 : "DELETE", RelationGetRelationName(rel)));
2737 :
2738 3 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
2739 3 : }
2740 :
2741 : /*
2742 : * Read tuple from file and put it in the input slot. All memory is allocated
2743 : * in the current memory context; caller is responsible for freeing it as
2744 : * appropriate.
2745 : *
2746 : * External attributes are stored in separate memory chunks, in order to avoid
2747 : * exceeding MaxAllocSize - that could happen if the individual attributes are
2748 : * smaller than MaxAllocSize but the whole tuple is bigger.
2749 : */
2750 : static void
2751 40 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
2752 : {
2753 : uint32 t_len;
2754 : HeapTuple tup;
2755 : int natt_ext;
2756 :
2757 : /* Read the tuple. */
2758 40 : BufFileReadExact(file, &t_len, sizeof(t_len));
2759 40 : tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2760 40 : tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2761 40 : BufFileReadExact(file, tup->t_data, t_len);
2762 40 : tup->t_len = t_len;
2763 40 : ItemPointerSetInvalid(&tup->t_self);
2764 40 : tup->t_tableOid = RelationGetRelid(relation);
2765 :
2766 : /*
2767 : * Put the tuple we read in a slot. This deforms it, so that we can hack
2768 : * the external attributes in place.
2769 : */
2770 40 : ExecForceStoreHeapTuple(tup, slot, false);
2771 :
2772 : /*
2773 : * Next, read any attributes we stored separately into the tts_values
2774 : * array elements expecting them, if any. This matches
2775 : * repack_store_change.
2776 : */
2777 40 : BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2778 40 : if (natt_ext > 0)
2779 : {
2780 11 : TupleDesc desc = slot->tts_tupleDescriptor;
2781 :
2782 66 : for (int i = 0; i < desc->natts; i++)
2783 : {
2784 55 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2785 : varlena *varlen;
2786 : uint64 chunk_header;
2787 : void *value;
2788 : Size varlensz;
2789 :
2790 55 : if (attr->attisdropped || attr->attlen != -1)
2791 40 : continue;
2792 22 : if (slot_attisnull(slot, i + 1))
2793 0 : continue;
2794 22 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
2795 22 : if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
2796 7 : continue;
2797 15 : slot_getsomeattrs(slot, i + 1);
2798 :
2799 15 : BufFileReadExact(file, &chunk_header, VARHDRSZ);
2800 15 : varlensz = VARSIZE_ANY(&chunk_header);
2801 :
2802 15 : value = palloc(varlensz);
2803 15 : memcpy(value, &chunk_header, VARHDRSZ);
2804 15 : BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2805 :
2806 15 : slot->tts_values[i] = PointerGetDatum(value);
2807 15 : natt_ext--;
2808 15 : if (natt_ext < 0)
2809 0 : ereport(ERROR,
2810 : errcode(ERRCODE_DATA_CORRUPTED),
2811 : errmsg("insufficient number of attributes stored separately"));
2812 : }
2813 : }
2814 40 : }
2815 :
2816 : /*
2817 : * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2818 : * corresponding ones from 'src'.
2819 : */
2820 : static void
2821 22 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
2822 : {
2823 22 : TupleDesc desc = dest->tts_tupleDescriptor;
2824 :
2825 104 : for (int i = 0; i < desc->natts; i++)
2826 : {
2827 82 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2828 : varlena *varlena_dst;
2829 :
2830 82 : if (attr->attisdropped)
2831 24 : continue;
2832 58 : if (attr->attlen != -1)
2833 28 : continue;
2834 30 : if (slot_attisnull(dest, i + 1))
2835 0 : continue;
2836 :
2837 30 : slot_getsomeattrs(dest, i + 1);
2838 :
2839 30 : varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2840 30 : if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
2841 28 : continue;
2842 2 : slot_getsomeattrs(src, i + 1);
2843 :
2844 2 : dest->tts_values[i] = src->tts_values[i];
2845 : }
2846 22 : }
2847 :
2848 : /*
2849 : * Find the tuple to be updated or deleted by the given data change, whose
2850 : * tuple has already been loaded into locator.
2851 : *
2852 : * If the tuple is found, put it in retrieved and return true. If the tuple is
2853 : * not found, return false.
2854 : */
2855 : static bool
2856 25 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
2857 : TupleTableSlot *retrieved)
2858 : {
2859 25 : Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2860 : IndexScanDesc scan;
2861 25 : bool retval = false;
2862 :
2863 : /*
2864 : * Scan key is passed by caller, so it does not have to be constructed
2865 : * multiple times. Key entries have all fields initialized, except for
2866 : * sk_argument.
2867 : *
2868 : * Use the incoming tuple to finalize the scan key.
2869 : */
2870 52 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2871 : {
2872 27 : ScanKey entry = &chgcxt->cc_ident_key[i];
2873 27 : AttrNumber attno = idx->indkey.values[i];
2874 :
2875 27 : entry->sk_argument = locator->tts_values[attno - 1];
2876 : Assert(!locator->tts_isnull[attno - 1]);
2877 : }
2878 :
2879 : /* XXX no instrumentation for now */
2880 25 : scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2881 : NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2882 25 : index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2883 26 : while (index_getnext_slot(scan, ForwardScanDirection, retrieved))
2884 : {
2885 : /* Be wary of temporal constraints */
2886 26 : if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
2887 : {
2888 1 : CHECK_FOR_INTERRUPTS();
2889 1 : continue;
2890 : }
2891 :
2892 25 : retval = true;
2893 25 : break;
2894 : }
2895 25 : index_endscan(scan);
2896 :
2897 25 : return retval;
2898 : }
2899 :
2900 : /*
2901 : * Check whether the candidate tuple matches the locator tuple on all replica
2902 : * identity key columns, using the same equality operators as the identity
2903 : * index scan. The locator tuple has already been loaded into cc_ident_key.
2904 : *
2905 : * This is needed to filter lossy index matches, such as GiST multirange scans
2906 : * used for temporal constraints.
2907 : */
2908 : static bool
2909 2 : identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator,
2910 : TupleTableSlot *candidate)
2911 : {
2912 2 : slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
2913 2 : slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
2914 :
2915 4 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2916 : {
2917 3 : ScanKey entry = &chgcxt->cc_ident_key[i];
2918 3 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
2919 :
2920 : Assert(attno > 0);
2921 :
2922 3 : if (locator->tts_isnull[attno - 1] != candidate->tts_isnull[attno - 1])
2923 0 : return false;
2924 :
2925 3 : if (locator->tts_isnull[attno - 1])
2926 0 : continue;
2927 :
2928 3 : if (!DatumGetBool(FunctionCall2Coll(&entry->sk_func,
2929 : entry->sk_collation,
2930 3 : candidate->tts_values[attno - 1],
2931 : entry->sk_argument)))
2932 1 : return false;
2933 : }
2934 :
2935 1 : return true;
2936 : }
2937 :
2938 : /*
2939 : * Decode and apply concurrent changes, up to (and including) the record whose
2940 : * LSN is 'end_of_wal'.
2941 : *
2942 : * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2943 : * are far too similar to each other.
2944 : */
2945 : static void
2946 14 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
2947 : {
2948 : DecodingWorkerShared *shared;
2949 : char fname[MAXPGPATH];
2950 : BufFile *file;
2951 :
2952 14 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2953 : PROGRESS_REPACK_PHASE_CATCH_UP);
2954 :
2955 : /* Ask the worker for the file. */
2956 14 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
2957 14 : SpinLockAcquire(&shared->mutex);
2958 14 : shared->lsn_upto = end_of_wal;
2959 14 : shared->done = done;
2960 14 : SpinLockRelease(&shared->mutex);
2961 :
2962 : /*
2963 : * The worker needs to finish processing of the current WAL record. Even
2964 : * if it's idle, it'll need to close the output file. Thus we're likely to
2965 : * wait, so prepare for sleep.
2966 : */
2967 14 : ConditionVariablePrepareToSleep(&shared->cv);
2968 : for (;;)
2969 14 : {
2970 : int last_exported;
2971 :
2972 28 : SpinLockAcquire(&shared->mutex);
2973 28 : last_exported = shared->last_exported;
2974 28 : SpinLockRelease(&shared->mutex);
2975 :
2976 : /*
2977 : * Has the worker exported the file we are waiting for?
2978 : */
2979 28 : if (last_exported == chgcxt->cc_file_seq)
2980 14 : break;
2981 :
2982 14 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
2983 : }
2984 14 : ConditionVariableCancelSleep();
2985 :
2986 : /* Open the file. */
2987 14 : DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
2988 14 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
2989 14 : apply_concurrent_changes(file, chgcxt);
2990 :
2991 14 : BufFileClose(file);
2992 :
2993 : /* Get ready for the next file. */
2994 14 : chgcxt->cc_file_seq++;
2995 14 : }
2996 :
2997 : /*
2998 : * Initialize the ChangeContext struct for the given relation, with
2999 : * the given index as identity index.
3000 : */
3001 : static void
3002 7 : initialize_change_context(ChangeContext *chgcxt,
3003 : Relation relation, Oid ident_index_id)
3004 : {
3005 7 : chgcxt->cc_rel = relation;
3006 :
3007 : /* Only initialize fields needed by ExecInsertIndexTuples(). */
3008 7 : chgcxt->cc_estate = CreateExecutorState();
3009 :
3010 7 : chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
3011 7 : InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
3012 7 : ExecOpenIndices(chgcxt->cc_rri, false);
3013 :
3014 : /*
3015 : * The table's relcache entry already has the relcache entry for the
3016 : * identity index; find that.
3017 : */
3018 7 : chgcxt->cc_ident_index = NULL;
3019 7 : for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
3020 : {
3021 : Relation ind_rel;
3022 :
3023 7 : ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
3024 7 : if (ind_rel->rd_id == ident_index_id)
3025 : {
3026 7 : chgcxt->cc_ident_index = ind_rel;
3027 7 : break;
3028 : }
3029 : }
3030 7 : if (chgcxt->cc_ident_index == NULL)
3031 0 : elog(ERROR, "could not find identity index");
3032 :
3033 : /* Set up for scanning said identity index */
3034 : {
3035 : Form_pg_index indexForm;
3036 :
3037 7 : indexForm = chgcxt->cc_ident_index->rd_index;
3038 7 : chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
3039 7 : chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
3040 16 : for (int i = 0; i < indexForm->indnkeyatts; i++)
3041 : {
3042 : ScanKey entry;
3043 : Oid opfamily,
3044 : opcintype,
3045 : opno,
3046 : opcode;
3047 : StrategyNumber eq_strategy;
3048 :
3049 9 : entry = &chgcxt->cc_ident_key[i];
3050 :
3051 9 : opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
3052 9 : opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
3053 9 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ,
3054 9 : chgcxt->cc_ident_index->rd_rel->relam,
3055 : opfamily, false);
3056 9 : if (eq_strategy == InvalidStrategy)
3057 0 : elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
3058 : opfamily, opcintype);
3059 9 : opno = get_opfamily_member(opfamily, opcintype, opcintype,
3060 : eq_strategy);
3061 9 : if (!OidIsValid(opno))
3062 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
3063 : eq_strategy, opcintype, opcintype, opfamily);
3064 9 : opcode = get_opcode(opno);
3065 9 : if (!OidIsValid(opcode))
3066 0 : elog(ERROR, "missing oprcode for operator %u", opno);
3067 :
3068 : /* Initialize everything but argument. */
3069 9 : ScanKeyInit(entry,
3070 9 : i + 1,
3071 : eq_strategy, opcode,
3072 : (Datum) 0);
3073 9 : entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
3074 : }
3075 : }
3076 :
3077 : /* Determine the last column we must deform to read the identity */
3078 7 : chgcxt->cc_last_key_attno = InvalidAttrNumber;
3079 16 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
3080 : {
3081 9 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
3082 :
3083 : Assert(attno > 0);
3084 9 : chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
3085 : }
3086 :
3087 7 : chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
3088 7 : }
3089 :
3090 : /*
3091 : * Free up resources taken by a ChangeContext.
3092 : */
3093 : static void
3094 7 : release_change_context(ChangeContext *chgcxt)
3095 : {
3096 7 : ExecCloseIndices(chgcxt->cc_rri);
3097 7 : FreeExecutorState(chgcxt->cc_estate);
3098 : /* XXX are these pfrees necessary? */
3099 7 : pfree(chgcxt->cc_rri);
3100 7 : pfree(chgcxt->cc_ident_key);
3101 7 : }
3102 :
3103 : /*
3104 : * The final steps of rebuild_relation() for concurrent processing.
3105 : *
3106 : * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
3107 : * clustering index (if one is passed) are still locked in a mode that allows
3108 : * concurrent data changes. On exit, both tables and their indexes are closed,
3109 : * but locked in AccessExclusiveLock mode.
3110 : */
3111 : static void
3112 7 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
3113 : Oid identIdx, TransactionId frozenXid,
3114 : MultiXactId cutoffMulti)
3115 : {
3116 : List *ind_oids_new;
3117 7 : Oid old_table_oid = RelationGetRelid(OldHeap);
3118 7 : Oid new_table_oid = RelationGetRelid(NewHeap);
3119 7 : List *ind_oids_old = RelationGetIndexList(OldHeap);
3120 : ListCell *lc,
3121 : *lc2;
3122 : char relpersistence;
3123 : bool is_system_catalog;
3124 : Oid ident_idx_new;
3125 : XLogRecPtr end_of_wal;
3126 : List *indexrels;
3127 : ChangeContext chgcxt;
3128 :
3129 : Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
3130 : Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
3131 :
3132 : /*
3133 : * Unlike the exclusive case, we build new indexes for the new relation
3134 : * rather than swapping the storage and reindexing the old relation. The
3135 : * point is that the index build can take some time, so we do it before we
3136 : * get AccessExclusiveLock on the old heap and therefore we cannot swap
3137 : * the heap storage yet.
3138 : *
3139 : * index_create() will lock the new indexes using AccessExclusiveLock - no
3140 : * need to change that. At the same time, we use ShareUpdateExclusiveLock
3141 : * to lock the existing indexes - that should be enough to prevent others
3142 : * from changing them while we're repacking the relation. The lock on
3143 : * table should prevent others from changing the index column list, but
3144 : * might not be enough for commands like ALTER INDEX ... SET ... (Those
3145 : * are not necessarily dangerous, but can make user confused if the
3146 : * changes they do get lost due to REPACK.)
3147 : */
3148 7 : ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
3149 :
3150 : /*
3151 : * The identity index in the new relation appears in the same relative
3152 : * position as the corresponding index in the old relation. Find it.
3153 : */
3154 7 : ident_idx_new = InvalidOid;
3155 14 : foreach_oid(ind_old, ind_oids_old)
3156 : {
3157 7 : if (identIdx == ind_old)
3158 : {
3159 7 : int pos = foreach_current_index(ind_old);
3160 :
3161 7 : if (list_length(ind_oids_new) <= pos)
3162 0 : elog(ERROR, "list of new indexes too short");
3163 7 : ident_idx_new = list_nth_oid(ind_oids_new, pos);
3164 7 : break;
3165 : }
3166 : }
3167 7 : if (!OidIsValid(ident_idx_new))
3168 0 : elog(ERROR, "could not find index matching \"%s\" at the new relation",
3169 : get_rel_name(identIdx));
3170 :
3171 : /* Gather information to apply concurrent changes. */
3172 7 : initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
3173 :
3174 : /*
3175 : * During testing, wait for another backend to perform concurrent data
3176 : * changes which we will process below.
3177 : */
3178 7 : INJECTION_POINT("repack-concurrently-before-lock", NULL);
3179 :
3180 : /*
3181 : * Flush all WAL records inserted so far (possibly except for the last
3182 : * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3183 : * we need to flush while holding exclusive lock on the source table.
3184 : */
3185 7 : XLogFlush(GetXLogInsertEndRecPtr());
3186 7 : end_of_wal = GetFlushRecPtr(NULL);
3187 :
3188 : /*
3189 : * Apply concurrent changes first time, to minimize the time we need to
3190 : * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3191 : * written during the data copying and index creation.)
3192 : */
3193 7 : process_concurrent_changes(end_of_wal, &chgcxt, false);
3194 :
3195 : /*
3196 : * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3197 : * is one), all its indexes, so that we can swap the files.
3198 : */
3199 7 : LockRelationOid(old_table_oid, AccessExclusiveLock);
3200 :
3201 : /*
3202 : * Lock all indexes now, not only the clustering one: all indexes need to
3203 : * have their files swapped. While doing that, store their relation
3204 : * references in a zero-terminated array, to handle predicate locks below.
3205 : */
3206 7 : indexrels = NIL;
3207 22 : foreach_oid(ind_oid, ind_oids_old)
3208 : {
3209 : Relation index;
3210 :
3211 8 : index = index_open(ind_oid, AccessExclusiveLock);
3212 :
3213 : /*
3214 : * Some things about the index may have changed before we locked the
3215 : * index, such as ALTER INDEX RENAME. We don't need to do anything
3216 : * here to absorb those changes in the new index.
3217 : */
3218 8 : indexrels = lappend(indexrels, index);
3219 : }
3220 :
3221 : /*
3222 : * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3223 : * needed to swap the files.
3224 : */
3225 7 : if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3226 3 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3227 :
3228 : /*
3229 : * Tuples and pages of the old heap will be gone, but the heap will stay.
3230 : */
3231 7 : TransferPredicateLocksToHeapRelation(OldHeap);
3232 22 : foreach_ptr(RelationData, index, indexrels)
3233 : {
3234 8 : TransferPredicateLocksToHeapRelation(index);
3235 8 : index_close(index, NoLock);
3236 : }
3237 7 : list_free(indexrels);
3238 :
3239 : /*
3240 : * Flush WAL again, to make sure that all changes committed while we were
3241 : * waiting for the exclusive lock are available for decoding.
3242 : */
3243 7 : XLogFlush(GetXLogInsertEndRecPtr());
3244 7 : end_of_wal = GetFlushRecPtr(NULL);
3245 :
3246 : /*
3247 : * Apply the concurrent changes again. Indicate that the decoding worker
3248 : * won't be needed anymore.
3249 : */
3250 7 : process_concurrent_changes(end_of_wal, &chgcxt, true);
3251 :
3252 : /* Remember info about rel before closing OldHeap */
3253 7 : relpersistence = OldHeap->rd_rel->relpersistence;
3254 7 : is_system_catalog = IsSystemRelation(OldHeap);
3255 :
3256 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3257 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
3258 :
3259 : /*
3260 : * Even ShareUpdateExclusiveLock should have prevented others from
3261 : * creating / dropping indexes (even using the CONCURRENTLY option), so we
3262 : * do not need to check whether the lists match.
3263 : */
3264 15 : forboth(lc, ind_oids_old, lc2, ind_oids_new)
3265 : {
3266 8 : Oid ind_old = lfirst_oid(lc);
3267 8 : Oid ind_new = lfirst_oid(lc2);
3268 8 : Oid mapped_tables[4] = {0};
3269 :
3270 8 : swap_relation_files(ind_old, ind_new,
3271 : (old_table_oid == RelationRelationId),
3272 : false, /* swap_toast_by_content */
3273 : true,
3274 : InvalidTransactionId,
3275 : InvalidMultiXactId,
3276 : mapped_tables);
3277 :
3278 : #ifdef USE_ASSERT_CHECKING
3279 :
3280 : /*
3281 : * Concurrent processing is not supported for system relations, so
3282 : * there should be no mapped tables.
3283 : */
3284 : for (int i = 0; i < 4; i++)
3285 : Assert(!OidIsValid(mapped_tables[i]));
3286 : #endif
3287 : }
3288 :
3289 : /* The new indexes must be visible for deletion. */
3290 7 : CommandCounterIncrement();
3291 :
3292 : /* Close the old heap but keep lock until transaction commit. */
3293 7 : table_close(OldHeap, NoLock);
3294 : /* Close the new heap. (We didn't have to open its indexes). */
3295 7 : table_close(NewHeap, NoLock);
3296 :
3297 : /* Cleanup what we don't need anymore. (And close the identity index.) */
3298 7 : release_change_context(&chgcxt);
3299 :
3300 : /*
3301 : * Swap the relations and their TOAST relations and TOAST indexes. This
3302 : * also drops the new relation and its indexes.
3303 : *
3304 : * (System catalogs are currently not supported.)
3305 : */
3306 : Assert(!is_system_catalog);
3307 7 : finish_heap_swap(old_table_oid, new_table_oid,
3308 : is_system_catalog,
3309 : false, /* swap_toast_by_content */
3310 : false,
3311 : true,
3312 : false, /* reindex */
3313 : frozenXid, cutoffMulti,
3314 : relpersistence);
3315 7 : }
3316 :
3317 : /*
3318 : * Build indexes on NewHeap according to those on OldHeap.
3319 : *
3320 : * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3321 : * up locked using ShareUpdateExclusiveLock.
3322 : *
3323 : * A list of OIDs of the corresponding indexes created on NewHeap is
3324 : * returned. The order of items does match, so we can use these arrays to swap
3325 : * index storage.
3326 : */
3327 : static List *
3328 7 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
3329 : {
3330 7 : List *result = NIL;
3331 :
3332 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3333 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
3334 :
3335 22 : foreach_oid(oldindex, OldIndexes)
3336 : {
3337 : Oid newindex;
3338 : char *newName;
3339 : Relation ind;
3340 :
3341 8 : ind = index_open(oldindex, ShareUpdateExclusiveLock);
3342 :
3343 8 : newName = ChooseRelationName(get_rel_name(oldindex),
3344 : NULL,
3345 : "repacknew",
3346 8 : get_rel_namespace(ind->rd_index->indrelid),
3347 : false);
3348 8 : newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
3349 8 : oldindex, ind->rd_rel->reltablespace,
3350 : newName);
3351 8 : copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
3352 8 : result = lappend_oid(result, newindex);
3353 :
3354 8 : index_close(ind, NoLock);
3355 : }
3356 :
3357 7 : return result;
3358 : }
3359 :
3360 : /*
3361 : * Create a transient copy of a constraint -- supported by a transient
3362 : * copy of the index that supports the original constraint.
3363 : *
3364 : * When repacking a table that contains exclusion constraints, the executor
3365 : * relies on these constraints being properly catalogued. These copies are
3366 : * to support that.
3367 : *
3368 : * We don't need the constraints for anything else (the original constraints
3369 : * will be there once repack completes), so we add pg_depend entries so that
3370 : * the are dropped when the transient table is dropped.
3371 : */
3372 : static void
3373 8 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
3374 : {
3375 : ScanKeyData skey;
3376 : Relation rel;
3377 : TupleDesc desc;
3378 : SysScanDesc scan;
3379 : HeapTuple tup;
3380 : ObjectAddress objrel;
3381 :
3382 8 : rel = table_open(ConstraintRelationId, RowExclusiveLock);
3383 8 : ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
3384 :
3385 : /*
3386 : * Retrieve the constraints supported by the old index and create an
3387 : * identical one that points to the new index.
3388 : */
3389 8 : ScanKeyInit(&skey,
3390 : Anum_pg_constraint_conrelid,
3391 : BTEqualStrategyNumber, F_OIDEQ,
3392 8 : ObjectIdGetDatum(old_index->rd_index->indrelid));
3393 8 : scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
3394 : NULL, 1, &skey);
3395 8 : desc = RelationGetDescr(rel);
3396 26 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3397 : {
3398 18 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
3399 : Oid oid;
3400 18 : Datum values[Natts_pg_constraint] = {0};
3401 18 : bool nulls[Natts_pg_constraint] = {0};
3402 18 : bool replaces[Natts_pg_constraint] = {0};
3403 : HeapTuple new_tup;
3404 : ObjectAddress objcon;
3405 :
3406 18 : if (conform->conindid != RelationGetRelid(old_index))
3407 11 : continue;
3408 :
3409 7 : oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
3410 : Anum_pg_constraint_oid);
3411 7 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
3412 7 : replaces[Anum_pg_constraint_oid - 1] = true;
3413 7 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
3414 7 : replaces[Anum_pg_constraint_conrelid - 1] = true;
3415 7 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
3416 7 : replaces[Anum_pg_constraint_conindid - 1] = true;
3417 :
3418 7 : new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3419 :
3420 : /* Insert it into the catalog. */
3421 7 : CatalogTupleInsert(rel, new_tup);
3422 :
3423 : /* Create a dependency so it's removed when we drop the new heap. */
3424 7 : ObjectAddressSet(objcon, ConstraintRelationId, oid);
3425 7 : recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
3426 : }
3427 8 : systable_endscan(scan);
3428 :
3429 8 : table_close(rel, RowExclusiveLock);
3430 :
3431 8 : CommandCounterIncrement();
3432 8 : }
3433 :
3434 : /*
3435 : * Try to start a background worker to perform logical decoding of data
3436 : * changes applied to relation while REPACK CONCURRENTLY is copying its
3437 : * contents to a new table.
3438 : */
3439 : static void
3440 7 : start_repack_decoding_worker(Oid relid)
3441 : {
3442 : Size size;
3443 : DecodingWorkerShared *shared;
3444 : shm_mq *mq;
3445 : BackgroundWorker bgw;
3446 :
3447 7 : decoding_worker = palloc0_object(DecodingWorker);
3448 :
3449 : /* Setup shared memory. */
3450 7 : size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3451 : BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
3452 7 : decoding_worker->seg = dsm_create(size, 0);
3453 :
3454 7 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
3455 7 : shared->initialized = false;
3456 7 : shared->lsn_upto = InvalidXLogRecPtr;
3457 7 : shared->done = false;
3458 7 : SharedFileSetInit(&shared->sfs, decoding_worker->seg);
3459 7 : shared->last_exported = -1;
3460 7 : SpinLockInit(&shared->mutex);
3461 7 : shared->dbid = MyDatabaseId;
3462 :
3463 : /*
3464 : * This is the UserId set in cluster_rel(). Security context shouldn't be
3465 : * needed for decoding worker.
3466 : */
3467 7 : shared->roleid = GetUserId();
3468 7 : shared->relid = relid;
3469 7 : ConditionVariableInit(&shared->cv);
3470 7 : shared->backend_proc = MyProc;
3471 7 : shared->backend_pid = MyProcPid;
3472 7 : shared->backend_proc_number = MyProcNumber;
3473 :
3474 7 : mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3475 : REPACK_ERROR_QUEUE_SIZE);
3476 7 : shm_mq_set_receiver(mq, MyProc);
3477 :
3478 7 : decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL);
3479 :
3480 7 : memset(&bgw, 0, sizeof(bgw));
3481 7 : snprintf(bgw.bgw_name, BGW_MAXLEN,
3482 : "REPACK decoding worker for relation \"%s\"",
3483 : get_rel_name(relid));
3484 7 : snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3485 7 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3486 : BGWORKER_BACKEND_DATABASE_CONNECTION;
3487 7 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3488 7 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
3489 7 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3490 7 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3491 7 : bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg));
3492 7 : bgw.bgw_notify_pid = MyProcPid;
3493 :
3494 7 : if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
3495 0 : ereport(ERROR,
3496 : errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
3497 : errmsg("out of background worker slots"),
3498 : errhint("You might need to increase \"%s\".", "max_worker_processes"));
3499 :
3500 : /*
3501 : * The decoding setup must be done before the caller can have XID assigned
3502 : * for any reason, otherwise the worker might end up in a deadlock,
3503 : * waiting for the caller's transaction to end. Therefore wait here until
3504 : * the worker indicates that it has the logical decoding initialized.
3505 : */
3506 7 : ConditionVariablePrepareToSleep(&shared->cv);
3507 : for (;;)
3508 17 : {
3509 : bool initialized;
3510 :
3511 24 : SpinLockAcquire(&shared->mutex);
3512 24 : initialized = shared->initialized;
3513 24 : SpinLockRelease(&shared->mutex);
3514 :
3515 24 : if (initialized)
3516 7 : break;
3517 :
3518 17 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3519 : }
3520 7 : ConditionVariableCancelSleep();
3521 7 : }
3522 :
3523 : /*
3524 : * Stop the decoding worker and cleanup the related resources.
3525 : *
3526 : * The worker stops on its own when it knows there is no more work to do, but
3527 : * we need to stop it explicitly at least on ERROR in the launching backend.
3528 : */
3529 : static void
3530 7 : stop_repack_decoding_worker(void)
3531 : {
3532 : /* Nothing to do if no worker was set up. */
3533 7 : if (decoding_worker == NULL)
3534 0 : return;
3535 :
3536 : /* Terminate the worker process, if one is running. */
3537 7 : if (decoding_worker->handle != NULL)
3538 : {
3539 : BgwHandleStatus status;
3540 :
3541 7 : TerminateBackgroundWorker(decoding_worker->handle);
3542 : /* The worker should really exit before the REPACK command does. */
3543 7 : HOLD_INTERRUPTS();
3544 7 : status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
3545 7 : RESUME_INTERRUPTS();
3546 :
3547 7 : if (status == BGWH_POSTMASTER_DIED)
3548 0 : ereport(FATAL,
3549 : errcode(ERRCODE_ADMIN_SHUTDOWN),
3550 : errmsg("postmaster exited during REPACK command"));
3551 : }
3552 :
3553 : /*
3554 : * Now detach from our shared memory segment. In error cases there might
3555 : * still be messages from the worker in the queue, which ProcessInterrupts
3556 : * would try to read; this is pointless (and causes an assertion failure),
3557 : * so set the global pointer to NULL to have ProcessRepackMessages ignore
3558 : * them.
3559 : *
3560 : * We must also cancel the current sleep, if one is still set up. This is
3561 : * critical because the CV lives in the DSM that we're about to detach, so
3562 : * if we omit it, later automatic cleanup tries to clear freed memory.
3563 : */
3564 7 : if (decoding_worker->error_mqh != NULL)
3565 7 : shm_mq_detach(decoding_worker->error_mqh);
3566 7 : ConditionVariableCancelSleep();
3567 7 : if (decoding_worker->seg != NULL)
3568 7 : dsm_detach(decoding_worker->seg);
3569 7 : pfree(decoding_worker);
3570 7 : decoding_worker = NULL;
3571 : }
3572 :
3573 : /* stop_repack_decoding_worker, wrapped as a before_shmem_exit callback */
3574 : static void
3575 0 : stop_repack_decoding_worker_cb(int code, Datum arg)
3576 : {
3577 0 : stop_repack_decoding_worker();
3578 0 : }
3579 :
3580 : /*
3581 : * Get the initial snapshot from the decoding worker.
3582 : */
3583 : static Snapshot
3584 7 : get_initial_snapshot(DecodingWorker *worker)
3585 : {
3586 : DecodingWorkerShared *shared;
3587 : char fname[MAXPGPATH];
3588 : BufFile *file;
3589 : Size snap_size;
3590 : char *snap_space;
3591 : Snapshot snapshot;
3592 :
3593 7 : shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3594 :
3595 : /*
3596 : * The worker needs to initialize the logical decoding, which usually
3597 : * takes some time. Therefore it makes sense to prepare for the sleep
3598 : * first.
3599 : */
3600 7 : ConditionVariablePrepareToSleep(&shared->cv);
3601 : for (;;)
3602 5 : {
3603 : int last_exported;
3604 :
3605 12 : SpinLockAcquire(&shared->mutex);
3606 12 : last_exported = shared->last_exported;
3607 12 : SpinLockRelease(&shared->mutex);
3608 :
3609 : /*
3610 : * Has the worker exported the file we are waiting for?
3611 : */
3612 12 : if (last_exported == WORKER_FILE_SNAPSHOT)
3613 7 : break;
3614 :
3615 5 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3616 : }
3617 7 : ConditionVariableCancelSleep();
3618 :
3619 : /* Read the snapshot from a file. */
3620 7 : DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
3621 7 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3622 7 : BufFileReadExact(file, &snap_size, sizeof(snap_size));
3623 7 : snap_space = (char *) palloc(snap_size);
3624 7 : BufFileReadExact(file, snap_space, snap_size);
3625 7 : BufFileClose(file);
3626 :
3627 : /* Restore it. */
3628 7 : snapshot = RestoreSnapshot(snap_space);
3629 7 : pfree(snap_space);
3630 :
3631 7 : return snapshot;
3632 : }
3633 :
3634 : /*
3635 : * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3636 : * If relations of the same 'relid' happen to be processed at the same time,
3637 : * they must be from different databases and therefore different backends must
3638 : * be involved.
3639 : */
3640 : void
3641 42 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
3642 : {
3643 : /* The PID is already present in the fileset name, so we needn't add it */
3644 42 : snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3645 42 : }
3646 :
3647 : /*
3648 : * Handle receipt of an interrupt indicating a repack worker message.
3649 : *
3650 : * Note: this is called within a signal handler! All we can do is set
3651 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3652 : * ProcessRepackMessages().
3653 : */
3654 : void
3655 7 : HandleRepackMessageInterrupt(void)
3656 : {
3657 7 : InterruptPending = true;
3658 7 : RepackMessagePending = true;
3659 7 : SetLatch(MyLatch);
3660 7 : }
3661 :
3662 : /*
3663 : * Process any queued protocol messages received from the repack worker.
3664 : */
3665 : void
3666 7 : ProcessRepackMessages(void)
3667 : {
3668 : MemoryContext oldcontext;
3669 : static MemoryContext hpm_context = NULL;
3670 :
3671 : /*
3672 : * Nothing to do if we haven't launched the worker yet or have already
3673 : * terminated it.
3674 : */
3675 7 : if (decoding_worker == NULL)
3676 0 : return;
3677 :
3678 : /*
3679 : * This is invoked from ProcessInterrupts(), and since some of the
3680 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3681 : * for recursive calls if more signals are received while this runs. It's
3682 : * unclear that recursive entry would be safe, and it doesn't seem useful
3683 : * even if it is safe, so let's block interrupts until done.
3684 : */
3685 7 : HOLD_INTERRUPTS();
3686 :
3687 : /*
3688 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3689 : * don't want to risk leaking data into long-lived contexts, so let's do
3690 : * our work here in a private context that we can reset on each use.
3691 : */
3692 7 : if (hpm_context == NULL) /* first time through? */
3693 6 : hpm_context = AllocSetContextCreate(TopMemoryContext,
3694 : "ProcessRepackMessages",
3695 : ALLOCSET_DEFAULT_SIZES);
3696 : else
3697 1 : MemoryContextReset(hpm_context);
3698 :
3699 7 : oldcontext = MemoryContextSwitchTo(hpm_context);
3700 :
3701 : /* OK to process messages. Reset the flag saying there are more to do. */
3702 7 : RepackMessagePending = false;
3703 :
3704 : /*
3705 : * Read as many messages as we can from the worker, but stop when no more
3706 : * messages can be read from the worker without blocking.
3707 : */
3708 : while (true)
3709 0 : {
3710 : shm_mq_result res;
3711 : Size nbytes;
3712 : void *data;
3713 :
3714 7 : res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
3715 : &data, true);
3716 7 : if (res == SHM_MQ_WOULD_BLOCK)
3717 0 : break;
3718 7 : else if (res == SHM_MQ_SUCCESS)
3719 : {
3720 : StringInfoData msg;
3721 :
3722 0 : initStringInfo(&msg);
3723 0 : appendBinaryStringInfo(&msg, data, nbytes);
3724 0 : ProcessRepackMessage(&msg);
3725 0 : pfree(msg.data);
3726 : }
3727 : else
3728 : {
3729 : /*
3730 : * The decoding worker is special in that it exits as soon as it
3731 : * has its work done. Thus the DETACHED result code is fine.
3732 : */
3733 : Assert(res == SHM_MQ_DETACHED);
3734 :
3735 7 : break;
3736 : }
3737 : }
3738 :
3739 7 : MemoryContextSwitchTo(oldcontext);
3740 :
3741 : /* Might as well clear the context on our way out */
3742 7 : MemoryContextReset(hpm_context);
3743 :
3744 7 : RESUME_INTERRUPTS();
3745 : }
3746 :
3747 : /*
3748 : * Process a single protocol message received from a single parallel worker.
3749 : */
3750 : static void
3751 0 : ProcessRepackMessage(StringInfo msg)
3752 : {
3753 : char msgtype;
3754 :
3755 0 : msgtype = pq_getmsgbyte(msg);
3756 :
3757 0 : switch (msgtype)
3758 : {
3759 0 : case PqMsg_ErrorResponse:
3760 : case PqMsg_NoticeResponse:
3761 : {
3762 : ErrorData edata;
3763 :
3764 : /* Parse ErrorResponse or NoticeResponse. */
3765 0 : pq_parse_errornotice(msg, &edata);
3766 :
3767 : /* Death of a worker isn't enough justification for suicide. */
3768 0 : edata.elevel = Min(edata.elevel, ERROR);
3769 :
3770 : /*
3771 : * Add a context line to show that this is a message
3772 : * propagated from the worker. Otherwise, it can sometimes be
3773 : * confusing to understand what actually happened.
3774 : */
3775 0 : if (edata.context)
3776 0 : edata.context = psprintf("%s\n%s", edata.context,
3777 : _("REPACK decoding worker"));
3778 : else
3779 0 : edata.context = pstrdup(_("REPACK decoding worker"));
3780 :
3781 : /* Rethrow error or print notice. */
3782 0 : ThrowErrorData(&edata);
3783 :
3784 0 : break;
3785 : }
3786 :
3787 0 : default:
3788 : {
3789 0 : elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3790 : msgtype, msg->len);
3791 : }
3792 : }
3793 0 : }
|