Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack.c
4 : * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 : * parts of this code.
6 : *
7 : * There are two somewhat different ways to rewrite a table. In non-
8 : * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 : * transient relation, copy the tuples over to the relfilenode of the new
10 : * relation, swap the relfilenodes, then drop the old relation.
11 : *
12 : * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 : * then do an initial copy as above. However, while the tuples are being
14 : * copied, concurrent transactions could modify the table. To cope with those
15 : * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 : * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 : * from being reserved), and accumulates the changes in a file. Once the
18 : * initial copy is complete, we read the changes from the file and re-apply
19 : * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 : * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 : * a strong lock on the table is much reduced, and the bloat is eliminated.
22 : *
23 : *
24 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 : * Portions Copyright (c) 1994-5, Regents of the University of California
26 : *
27 : *
28 : * IDENTIFICATION
29 : * src/backend/commands/repack.c
30 : *
31 : *-------------------------------------------------------------------------
32 : */
33 : #include "postgres.h"
34 :
35 : #include "access/amapi.h"
36 : #include "access/heapam.h"
37 : #include "access/multixact.h"
38 : #include "access/relscan.h"
39 : #include "access/tableam.h"
40 : #include "access/toast_internals.h"
41 : #include "access/transam.h"
42 : #include "access/xact.h"
43 : #include "catalog/catalog.h"
44 : #include "catalog/dependency.h"
45 : #include "catalog/heap.h"
46 : #include "catalog/index.h"
47 : #include "catalog/namespace.h"
48 : #include "catalog/objectaccess.h"
49 : #include "catalog/pg_am.h"
50 : #include "catalog/pg_constraint.h"
51 : #include "catalog/pg_inherits.h"
52 : #include "catalog/toasting.h"
53 : #include "commands/defrem.h"
54 : #include "commands/progress.h"
55 : #include "commands/repack.h"
56 : #include "commands/repack_internal.h"
57 : #include "commands/tablecmds.h"
58 : #include "commands/vacuum.h"
59 : #include "executor/executor.h"
60 : #include "libpq/pqformat.h"
61 : #include "libpq/pqmq.h"
62 : #include "miscadmin.h"
63 : #include "optimizer/optimizer.h"
64 : #include "pgstat.h"
65 : #include "storage/bufmgr.h"
66 : #include "storage/lmgr.h"
67 : #include "storage/predicate.h"
68 : #include "storage/proc.h"
69 : #include "utils/acl.h"
70 : #include "utils/fmgroids.h"
71 : #include "utils/guc.h"
72 : #include "utils/injection_point.h"
73 : #include "utils/inval.h"
74 : #include "utils/lsyscache.h"
75 : #include "utils/memutils.h"
76 : #include "utils/pg_rusage.h"
77 : #include "utils/relmapper.h"
78 : #include "utils/snapmgr.h"
79 : #include "utils/syscache.h"
80 : #include "utils/wait_event_types.h"
81 :
82 : /*
83 : * This struct is used to pass around the information on tables to be
84 : * clustered. We need this so we can make a list of them when invoked without
85 : * a specific table/index pair.
86 : */
87 : typedef struct
88 : {
89 : Oid tableOid;
90 : Oid indexOid;
91 : } RelToCluster;
92 :
93 : /*
94 : * The first file exported by the decoding worker must contain a snapshot, the
95 : * following ones contain the data changes.
96 : */
97 : #define WORKER_FILE_SNAPSHOT 0
98 :
99 : /*
100 : * Information needed to apply concurrent data changes.
101 : */
102 : typedef struct ChangeContext
103 : {
104 : /* The relation the changes are applied to. */
105 : Relation cc_rel;
106 :
107 : /* Needed to update indexes of rel_dst. */
108 : ResultRelInfo *cc_rri;
109 : EState *cc_estate;
110 :
111 : /*
112 : * Existing tuples to UPDATE and DELETE are located via this index. We
113 : * keep the scankey in partially initialized state to avoid repeated work.
114 : * sk_argument is completed on the fly.
115 : */
116 : Relation cc_ident_index;
117 : ScanKey cc_ident_key;
118 : int cc_ident_key_nentries;
119 :
120 : /* Sequential number of the file containing the changes. */
121 : int cc_file_seq;
122 : } ChangeContext;
123 :
124 : /*
125 : * Backend-local information to control the decoding worker.
126 : */
127 : typedef struct DecodingWorker
128 : {
129 : /* The worker. */
130 : BackgroundWorkerHandle *handle;
131 :
132 : /* DecodingWorkerShared is in this segment. */
133 : dsm_segment *seg;
134 :
135 : /* Handle of the error queue. */
136 : shm_mq_handle *error_mqh;
137 : } DecodingWorker;
138 :
139 : /* Pointer to currently running decoding worker. */
140 : static DecodingWorker *decoding_worker = NULL;
141 :
142 : /*
143 : * Is there a message sent by a repack worker that the backend needs to
144 : * receive?
145 : */
146 : volatile sig_atomic_t RepackMessagePending = false;
147 :
148 : static LOCKMODE RepackLockLevel(bool concurrent);
149 : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
150 : Oid indexOid, Oid userid, LOCKMODE lmode,
151 : int options);
152 : static void check_concurrent_repack_requirements(Relation rel,
153 : Oid *ident_idx_p);
154 : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
155 : Oid ident_idx);
156 : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
157 : Snapshot snapshot,
158 : bool verbose,
159 : bool *pSwapToastByContent,
160 : TransactionId *pFreezeXid,
161 : MultiXactId *pCutoffMulti);
162 : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
163 : MemoryContext permcxt);
164 : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
165 : Oid relid, bool rel_is_index,
166 : MemoryContext permcxt);
167 : static bool repack_is_permitted_for_relation(RepackCommand cmd,
168 : Oid relid, Oid userid);
169 :
170 : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
171 : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
172 : ChangeContext *chgcxt);
173 : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
174 : TupleTableSlot *ondisk_tuple,
175 : ChangeContext *chgcxt);
176 : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
177 : static void restore_tuple(BufFile *file, Relation relation,
178 : TupleTableSlot *slot);
179 : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
180 : TupleTableSlot *src);
181 : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
182 : TupleTableSlot *locator,
183 : TupleTableSlot *received);
184 : static void process_concurrent_changes(XLogRecPtr end_of_wal,
185 : ChangeContext *chgcxt,
186 : bool done);
187 : static void initialize_change_context(ChangeContext *chgcxt,
188 : Relation relation,
189 : Oid ident_index_id);
190 : static void release_change_context(ChangeContext *chgcxt);
191 : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
192 : Oid identIdx,
193 : TransactionId frozenXid,
194 : MultiXactId cutoffMulti);
195 : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
196 : static void copy_index_constraints(Relation old_index, Oid new_index_id,
197 : Oid new_heap_id);
198 : static Relation process_single_relation(RepackStmt *stmt,
199 : LOCKMODE lockmode,
200 : bool isTopLevel,
201 : ClusterParams *params);
202 : static Oid determine_clustered_index(Relation rel, bool usingindex,
203 : const char *indexname);
204 :
205 : static void start_repack_decoding_worker(Oid relid);
206 : static void stop_repack_decoding_worker(void);
207 : static Snapshot get_initial_snapshot(DecodingWorker *worker);
208 :
209 : static void ProcessRepackMessage(StringInfo msg);
210 : static const char *RepackCommandAsString(RepackCommand cmd);
211 :
212 :
213 : /*
214 : * The repack code allows for processing multiple tables at once. Because
215 : * of this, we cannot just run everything on a single transaction, or we
216 : * would be forced to acquire exclusive locks on all the tables being
217 : * clustered, simultaneously --- very likely leading to deadlock.
218 : *
219 : * To solve this we follow a similar strategy to VACUUM code, processing each
220 : * relation in a separate transaction. For this to work, we need to:
221 : *
222 : * - provide a separate memory context so that we can pass information in
223 : * a way that survives across transactions
224 : * - start a new transaction every time a new relation is clustered
225 : * - check for validity of the information on to-be-clustered relations,
226 : * as someone might have deleted a relation behind our back, or
227 : * clustered one on a different index
228 : * - end the transaction
229 : *
230 : * The single-relation case does not have any such overhead.
231 : *
232 : * We also allow a relation to be repacked following an index, but without
233 : * naming a specific one. In that case, the indisclustered bit will be
234 : * looked up, and an ERROR will be thrown if no so-marked index is found.
235 : */
236 : void
237 191 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
238 : {
239 191 : ClusterParams params = {0};
240 191 : Relation rel = NULL;
241 : MemoryContext repack_context;
242 : LOCKMODE lockmode;
243 : List *rtcs;
244 :
245 : /* Parse option list */
246 409 : foreach_node(DefElem, opt, stmt->params)
247 : {
248 27 : if (strcmp(opt->defname, "verbose") == 0)
249 9 : params.options |= defGetBoolean(opt) ? CLUOPT_VERBOSE : 0;
250 18 : else if (strcmp(opt->defname, "analyze") == 0 ||
251 10 : strcmp(opt->defname, "analyse") == 0)
252 8 : params.options |= defGetBoolean(opt) ? CLUOPT_ANALYZE : 0;
253 20 : else if (strcmp(opt->defname, "concurrently") == 0 &&
254 10 : defGetBoolean(opt))
255 : {
256 10 : if (stmt->command != REPACK_COMMAND_REPACK)
257 0 : ereport(ERROR,
258 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
259 : errmsg("CONCURRENTLY option not supported for %s",
260 : RepackCommandAsString(stmt->command)));
261 10 : params.options |= CLUOPT_CONCURRENT;
262 : }
263 : else
264 0 : ereport(ERROR,
265 : errcode(ERRCODE_SYNTAX_ERROR),
266 : errmsg("unrecognized %s option \"%s\"",
267 : RepackCommandAsString(stmt->command),
268 : opt->defname),
269 : parser_errposition(pstate, opt->location));
270 : }
271 :
272 : /* Determine the lock mode to use. */
273 191 : lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
274 :
275 191 : if ((params.options & CLUOPT_CONCURRENT) != 0)
276 : {
277 : /*
278 : * Make sure we're not in a transaction block.
279 : *
280 : * The reason is that repack_setup_logical_decoding() could wait
281 : * indefinitely for our XID to complete. (The deadlock detector would
282 : * not recognize it because we'd be waiting for ourselves, i.e. no
283 : * real lock conflict.) It would be possible to run in a transaction
284 : * block if we had no XID, but this restriction is simpler for users
285 : * to understand and we don't lose any functionality.
286 : */
287 10 : PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
288 : }
289 :
290 : /*
291 : * If a single relation is specified, process it and we're done ... unless
292 : * the relation is a partitioned table, in which case we fall through.
293 : */
294 191 : if (stmt->relation != NULL)
295 : {
296 173 : rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms);
297 153 : if (rel == NULL)
298 117 : return; /* all done */
299 : }
300 :
301 : /*
302 : * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
303 : * can add support for this later.
304 : */
305 54 : if (params.options & CLUOPT_ANALYZE)
306 0 : ereport(ERROR,
307 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
308 : errmsg("cannot execute %s on multiple tables",
309 : "REPACK (ANALYZE)"));
310 :
311 : /*
312 : * By here, we know we are in a multi-table situation.
313 : *
314 : * Concurrent processing is currently considered rather special (e.g. in
315 : * terms of resources consumed) so it is not performed in bulk.
316 : */
317 54 : if (params.options & CLUOPT_CONCURRENT)
318 : {
319 4 : if (rel != NULL)
320 : {
321 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
322 4 : ereport(ERROR,
323 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324 : errmsg("REPACK (CONCURRENTLY) is not supported for partitioned tables"),
325 : errhint("Consider running the command on individual partitions."));
326 : }
327 : else
328 0 : ereport(ERROR,
329 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
330 : errmsg("REPACK (CONCURRENTLY) requires an explicit table name"));
331 : }
332 :
333 : /*
334 : * In order to avoid holding locks for too long, we want to process each
335 : * table in its own transaction. This forces us to disallow running
336 : * inside a user transaction block.
337 : */
338 50 : PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
339 :
340 : /* Also, we need a memory context to hold our list of relations */
341 50 : repack_context = AllocSetContextCreate(PortalContext,
342 : "Repack",
343 : ALLOCSET_DEFAULT_SIZES);
344 :
345 : /*
346 : * Since we open a new transaction for each relation, we have to check
347 : * that the relation still is what we think it is.
348 : *
349 : * In single-transaction CLUSTER, we don't need the overhead.
350 : */
351 50 : params.options |= CLUOPT_RECHECK;
352 :
353 : /*
354 : * If we don't have a relation yet, determine a relation list. If we do,
355 : * then it must be a partitioned table, and we want to process its
356 : * partitions.
357 : */
358 50 : if (rel == NULL)
359 : {
360 : Assert(stmt->indexname == NULL);
361 18 : rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
362 : repack_context);
363 18 : params.options |= CLUOPT_RECHECK_ISCLUSTERED;
364 : }
365 : else
366 : {
367 : Oid relid;
368 : bool rel_is_index;
369 :
370 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
371 :
372 : /*
373 : * If USING INDEX was specified, resolve the index name now and pass
374 : * it down.
375 : */
376 32 : if (stmt->usingindex)
377 : {
378 : /*
379 : * If no index name was specified when repacking a partitioned
380 : * table, punt for now. Maybe we can improve this later.
381 : */
382 28 : if (!stmt->indexname)
383 : {
384 8 : if (stmt->command == REPACK_COMMAND_CLUSTER)
385 4 : ereport(ERROR,
386 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
387 : errmsg("there is no previously clustered index for table \"%s\"",
388 : RelationGetRelationName(rel)));
389 : else
390 4 : ereport(ERROR,
391 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
392 : /*- translator: first %s is name of a SQL command, eg. REPACK */
393 : errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
394 : RepackCommandAsString(stmt->command),
395 : RelationGetRelationName(rel)));
396 : }
397 :
398 20 : relid = determine_clustered_index(rel, stmt->usingindex,
399 20 : stmt->indexname);
400 20 : if (!OidIsValid(relid))
401 0 : elog(ERROR, "unable to determine index to cluster on");
402 20 : check_index_is_clusterable(rel, relid, AccessExclusiveLock);
403 :
404 16 : rel_is_index = true;
405 : }
406 : else
407 : {
408 4 : relid = RelationGetRelid(rel);
409 4 : rel_is_index = false;
410 : }
411 :
412 20 : rtcs = get_tables_to_repack_partitioned(stmt->command,
413 : relid, rel_is_index,
414 : repack_context);
415 :
416 : /* close parent relation, releasing lock on it */
417 20 : table_close(rel, AccessExclusiveLock);
418 20 : rel = NULL;
419 : }
420 :
421 : /* Commit to get out of starting transaction */
422 38 : PopActiveSnapshot();
423 38 : CommitTransactionCommand();
424 :
425 : /* Cluster the tables, each in a separate transaction */
426 : Assert(rel == NULL);
427 128 : foreach_ptr(RelToCluster, rtc, rtcs)
428 : {
429 : /* Start a new transaction for each relation. */
430 52 : StartTransactionCommand();
431 :
432 : /*
433 : * Open the target table, coping with the case where it has been
434 : * dropped.
435 : */
436 52 : rel = try_table_open(rtc->tableOid, lockmode);
437 52 : if (rel == NULL)
438 : {
439 0 : CommitTransactionCommand();
440 0 : continue;
441 : }
442 :
443 : /* functions in indexes may want a snapshot set */
444 52 : PushActiveSnapshot(GetTransactionSnapshot());
445 :
446 : /* Process this table */
447 52 : cluster_rel(stmt->command, rel, rtc->indexOid, ¶ms, isTopLevel);
448 : /* cluster_rel closes the relation, but keeps lock */
449 :
450 52 : PopActiveSnapshot();
451 52 : CommitTransactionCommand();
452 : }
453 :
454 : /* Start a new transaction for the cleanup work. */
455 38 : StartTransactionCommand();
456 :
457 : /* Clean up working storage */
458 38 : MemoryContextDelete(repack_context);
459 : }
460 :
461 : /*
462 : * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
463 : * operation to avoid any lock-upgrade hazards. In the concurrent case, we
464 : * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
465 : * processing and only acquire AccessExclusiveLock at the end, to swap the
466 : * relation -- supposedly for a short time.
467 : */
468 : static LOCKMODE
469 977 : RepackLockLevel(bool concurrent)
470 : {
471 977 : if (concurrent)
472 18 : return ShareUpdateExclusiveLock;
473 : else
474 959 : return AccessExclusiveLock;
475 : }
476 :
477 : /*
478 : * cluster_rel
479 : *
480 : * This clusters the table by creating a new, clustered table and
481 : * swapping the relfilenumbers of the new table and the old table, so
482 : * the OID of the original table is preserved. Thus we do not lose
483 : * GRANT, inheritance nor references to this table.
484 : *
485 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
486 : * the new table, it's better to create the indexes afterwards than to fill
487 : * them incrementally while we load the table.
488 : *
489 : * If indexOid is InvalidOid, the table will be rewritten in physical order
490 : * instead of index order.
491 : *
492 : * Note that, in the concurrent case, the function releases the lock at some
493 : * point, in order to get AccessExclusiveLock for the final steps (i.e. to
494 : * swap the relation files). To make things simpler, the caller should expect
495 : * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
496 : * AccessExclusiveLock is kept till the end of the transaction.)
497 : *
498 : * 'cmd' indicates which command is being executed, to be used for error
499 : * messages.
500 : */
501 : void
502 395 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
503 : ClusterParams *params, bool isTopLevel)
504 : {
505 395 : Oid tableOid = RelationGetRelid(OldHeap);
506 : Relation index;
507 : LOCKMODE lmode;
508 : Oid save_userid;
509 : int save_sec_context;
510 : int save_nestlevel;
511 395 : bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
512 395 : bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
513 395 : bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
514 395 : Oid ident_idx = InvalidOid;
515 :
516 : /* Determine the lock mode to use. */
517 395 : lmode = RepackLockLevel(concurrent);
518 :
519 : /*
520 : * Check some preconditions in the concurrent case. This also obtains the
521 : * replica index OID.
522 : */
523 395 : if (concurrent)
524 6 : check_concurrent_repack_requirements(OldHeap, &ident_idx);
525 :
526 : /* Check for user-requested abort. */
527 391 : CHECK_FOR_INTERRUPTS();
528 :
529 391 : pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
530 391 : pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
531 :
532 : /*
533 : * Switch to the table owner's userid, so that any index functions are run
534 : * as that user. Also lock down security-restricted operations and
535 : * arrange to make GUC variable changes local to this command.
536 : */
537 391 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
538 391 : SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
539 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
540 391 : save_nestlevel = NewGUCNestLevel();
541 391 : RestrictSearchPath();
542 :
543 : /*
544 : * Recheck that the relation is still what it was when we started.
545 : *
546 : * Note that it's critical to skip this in single-relation CLUSTER;
547 : * otherwise, we would reject an attempt to cluster using a
548 : * not-previously-clustered index.
549 : */
550 391 : if (recheck &&
551 52 : !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
552 52 : lmode, params->options))
553 0 : goto out;
554 :
555 : /*
556 : * We allow repacking shared catalogs only when not using an index. It
557 : * would work to use an index in most respects, but the index would only
558 : * get marked as indisclustered in the current database, leading to
559 : * unexpected behavior if CLUSTER were later invoked in another database.
560 : */
561 391 : if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
562 0 : ereport(ERROR,
563 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
564 : /*- translator: first %s is name of a SQL command, eg. REPACK */
565 : errmsg("cannot execute %s on a shared catalog",
566 : RepackCommandAsString(cmd)));
567 :
568 : /*
569 : * The CONCURRENTLY case should have been rejected earlier because it does
570 : * not support system catalogs.
571 : */
572 : Assert(!(OldHeap->rd_rel->relisshared && concurrent));
573 :
574 : /*
575 : * Don't process temp tables of other backends ... their local buffer
576 : * manager is not going to cope.
577 : */
578 391 : if (RELATION_IS_OTHER_TEMP(OldHeap))
579 0 : ereport(ERROR,
580 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
581 : /*- translator: first %s is name of a SQL command, eg. REPACK */
582 : errmsg("cannot execute %s on temporary tables of other sessions",
583 : RepackCommandAsString(cmd)));
584 :
585 : /*
586 : * Also check for active uses of the relation in the current transaction,
587 : * including open scans and pending AFTER trigger events.
588 : */
589 391 : CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
590 :
591 : /* Check heap and index are valid to cluster on */
592 391 : if (OidIsValid(indexOid))
593 : {
594 : /* verify the index is good and lock it */
595 140 : check_index_is_clusterable(OldHeap, indexOid, lmode);
596 : /* also open it */
597 140 : index = index_open(indexOid, NoLock);
598 : }
599 : else
600 251 : index = NULL;
601 :
602 : /*
603 : * When allow_system_table_mods is turned off, we disallow repacking a
604 : * catalog on a particular index unless that's already the clustered index
605 : * for that catalog.
606 : *
607 : * XXX We don't check for this in CLUSTER, because it's historically been
608 : * allowed.
609 : */
610 391 : if (cmd != REPACK_COMMAND_CLUSTER &&
611 268 : !allowSystemTableMods && OidIsValid(indexOid) &&
612 17 : IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
613 0 : ereport(ERROR,
614 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
615 : errmsg("permission denied: \"%s\" is a system catalog",
616 : RelationGetRelationName(OldHeap)),
617 : errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
618 : "allow_system_table_mods"));
619 :
620 : /*
621 : * Quietly ignore the request if this is a materialized view which has not
622 : * been populated from its query. No harm is done because there is no data
623 : * to deal with, and we don't want to throw an error if this is part of a
624 : * multi-relation request -- for example, CLUSTER was run on the entire
625 : * database.
626 : */
627 391 : if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
628 0 : !RelationIsPopulated(OldHeap))
629 : {
630 0 : if (index)
631 0 : index_close(index, lmode);
632 0 : relation_close(OldHeap, lmode);
633 0 : goto out;
634 : }
635 :
636 : Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
637 : OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
638 : OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
639 :
640 : /*
641 : * All predicate locks on the tuples or pages are about to be made
642 : * invalid, because we move tuples around. Promote them to relation
643 : * locks. Predicate locks on indexes will be promoted when they are
644 : * reindexed.
645 : *
646 : * During concurrent processing, the heap as well as its indexes stay in
647 : * operation, so we postpone this step until they are locked using
648 : * AccessExclusiveLock near the end of the processing.
649 : */
650 391 : if (!concurrent)
651 389 : TransferPredicateLocksToHeapRelation(OldHeap);
652 :
653 : /* rebuild_relation does all the dirty work */
654 391 : PG_TRY();
655 : {
656 391 : rebuild_relation(OldHeap, index, verbose, ident_idx);
657 : }
658 4 : PG_FINALLY();
659 : {
660 391 : if (concurrent)
661 : {
662 : /*
663 : * Since during normal operation the worker was already asked to
664 : * exit, stopping it explicitly is especially important on ERROR.
665 : * However it still seems a good practice to make sure that the
666 : * worker never survives the REPACK command.
667 : */
668 2 : stop_repack_decoding_worker();
669 : }
670 : }
671 391 : PG_END_TRY();
672 :
673 : /* rebuild_relation closes OldHeap, and index if valid */
674 :
675 387 : out:
676 : /* Roll back any GUC changes executed by index functions */
677 387 : AtEOXact_GUC(false, save_nestlevel);
678 :
679 : /* Restore userid and security context */
680 387 : SetUserIdAndSecContext(save_userid, save_sec_context);
681 :
682 387 : pgstat_progress_end_command();
683 387 : }
684 :
685 : /*
686 : * Check if the table (and its index) still meets the requirements of
687 : * cluster_rel().
688 : */
689 : static bool
690 52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
691 : Oid userid, LOCKMODE lmode, int options)
692 : {
693 52 : Oid tableOid = RelationGetRelid(OldHeap);
694 :
695 : /* Check that the user still has privileges for the relation */
696 52 : if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
697 : {
698 0 : relation_close(OldHeap, lmode);
699 0 : return false;
700 : }
701 :
702 : /*
703 : * Silently skip a temp table for a remote session. Only doing this check
704 : * in the "recheck" case is appropriate (which currently means somebody is
705 : * executing a database-wide CLUSTER or on a partitioned table), because
706 : * there is another check in cluster() which will stop any attempt to
707 : * cluster remote temp tables by name. There is another check in
708 : * cluster_rel which is redundant, but we leave it for extra safety.
709 : */
710 52 : if (RELATION_IS_OTHER_TEMP(OldHeap))
711 : {
712 0 : relation_close(OldHeap, lmode);
713 0 : return false;
714 : }
715 :
716 52 : if (OidIsValid(indexOid))
717 : {
718 : /*
719 : * Check that the index still exists
720 : */
721 32 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
722 : {
723 0 : relation_close(OldHeap, lmode);
724 0 : return false;
725 : }
726 :
727 : /*
728 : * Check that the index is still the one with indisclustered set, if
729 : * needed.
730 : */
731 32 : if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
732 4 : !get_index_isclustered(indexOid))
733 : {
734 0 : relation_close(OldHeap, lmode);
735 0 : return false;
736 : }
737 : }
738 :
739 52 : return true;
740 : }
741 :
742 : /*
743 : * Verify that the specified heap and index are valid to cluster on
744 : *
745 : * Side effect: obtains lock on the index. The caller may
746 : * in some cases already have a lock of the same strength on the table, but
747 : * not in all cases so we can't rely on the table-level lock for
748 : * protection here.
749 : */
750 : void
751 307 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
752 : {
753 : Relation OldIndex;
754 :
755 307 : OldIndex = index_open(indexOid, lockmode);
756 :
757 : /*
758 : * Check that index is in fact an index on the given relation
759 : */
760 307 : if (OldIndex->rd_index == NULL ||
761 307 : OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
762 0 : ereport(ERROR,
763 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
764 : errmsg("\"%s\" is not an index for table \"%s\"",
765 : RelationGetRelationName(OldIndex),
766 : RelationGetRelationName(OldHeap))));
767 :
768 : /* Index AM must allow clustering */
769 307 : if (!OldIndex->rd_indam->amclusterable)
770 0 : ereport(ERROR,
771 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
772 : errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
773 : RelationGetRelationName(OldIndex))));
774 :
775 : /*
776 : * Disallow clustering on incomplete indexes (those that might not index
777 : * every row of the relation). We could relax this by making a separate
778 : * seqscan pass over the table to copy the missing rows, but that seems
779 : * expensive and tedious.
780 : */
781 307 : if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
782 0 : ereport(ERROR,
783 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
784 : errmsg("cannot cluster on partial index \"%s\"",
785 : RelationGetRelationName(OldIndex))));
786 :
787 : /*
788 : * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
789 : * it might well not contain entries for every heap row, or might not even
790 : * be internally consistent. (But note that we don't check indcheckxmin;
791 : * the worst consequence of following broken HOT chains would be that we
792 : * might put recently-dead tuples out-of-order in the new table, and there
793 : * is little harm in that.)
794 : */
795 307 : if (!OldIndex->rd_index->indisvalid)
796 4 : ereport(ERROR,
797 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
798 : errmsg("cannot cluster on invalid index \"%s\"",
799 : RelationGetRelationName(OldIndex))));
800 :
801 : /* Drop relcache refcnt on OldIndex, but keep lock */
802 303 : index_close(OldIndex, NoLock);
803 303 : }
804 :
805 : /*
806 : * mark_index_clustered: mark the specified index as the one clustered on
807 : *
808 : * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
809 : */
810 : void
811 187 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
812 : {
813 : HeapTuple indexTuple;
814 : Form_pg_index indexForm;
815 : Relation pg_index;
816 : ListCell *index;
817 :
818 : Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
819 :
820 : /*
821 : * If the index is already marked clustered, no need to do anything.
822 : */
823 187 : if (OidIsValid(indexOid))
824 : {
825 179 : if (get_index_isclustered(indexOid))
826 38 : return;
827 : }
828 :
829 : /*
830 : * Check each index of the relation and set/clear the bit as needed.
831 : */
832 149 : pg_index = table_open(IndexRelationId, RowExclusiveLock);
833 :
834 452 : foreach(index, RelationGetIndexList(rel))
835 : {
836 303 : Oid thisIndexOid = lfirst_oid(index);
837 :
838 303 : indexTuple = SearchSysCacheCopy1(INDEXRELID,
839 : ObjectIdGetDatum(thisIndexOid));
840 303 : if (!HeapTupleIsValid(indexTuple))
841 0 : elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
842 303 : indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
843 :
844 : /*
845 : * Unset the bit if set. We know it's wrong because we checked this
846 : * earlier.
847 : */
848 303 : if (indexForm->indisclustered)
849 : {
850 20 : indexForm->indisclustered = false;
851 20 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
852 : }
853 283 : else if (thisIndexOid == indexOid)
854 : {
855 : /* this was checked earlier, but let's be real sure */
856 141 : if (!indexForm->indisvalid)
857 0 : elog(ERROR, "cannot cluster on invalid index %u", indexOid);
858 141 : indexForm->indisclustered = true;
859 141 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
860 : }
861 :
862 303 : InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
863 : InvalidOid, is_internal);
864 :
865 303 : heap_freetuple(indexTuple);
866 : }
867 :
868 149 : table_close(pg_index, RowExclusiveLock);
869 : }
870 :
871 : /*
872 : * Check if the CONCURRENTLY option is legal for the relation.
873 : *
874 : * *Ident_idx_p receives OID of the identity index.
875 : */
876 : static void
877 6 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
878 : {
879 : char relpersistence,
880 : replident;
881 : Oid ident_idx;
882 :
883 : /* Data changes in system relations are not logically decoded. */
884 6 : if (IsCatalogRelation(rel))
885 4 : ereport(ERROR,
886 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
887 : errmsg("cannot repack relation \"%s\"",
888 : RelationGetRelationName(rel)),
889 : errhint("REPACK CONCURRENTLY is not supported for catalog relations."));
890 :
891 : /*
892 : * reorderbuffer.c does not seem to handle processing of TOAST relation
893 : * alone.
894 : */
895 2 : if (IsToastRelation(rel))
896 0 : ereport(ERROR,
897 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
898 : errmsg("cannot repack relation \"%s\"",
899 : RelationGetRelationName(rel)),
900 : errhint("REPACK CONCURRENTLY is not supported for TOAST relations"));
901 :
902 2 : relpersistence = rel->rd_rel->relpersistence;
903 2 : if (relpersistence != RELPERSISTENCE_PERMANENT)
904 0 : ereport(ERROR,
905 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906 : errmsg("cannot repack relation \"%s\"",
907 : RelationGetRelationName(rel)),
908 : errhint("REPACK CONCURRENTLY is only allowed for permanent relations."));
909 :
910 : /* With NOTHING, WAL does not contain the old tuple. */
911 2 : replident = rel->rd_rel->relreplident;
912 2 : if (replident == REPLICA_IDENTITY_NOTHING)
913 0 : ereport(ERROR,
914 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
915 : errmsg("cannot repack relation \"%s\"",
916 : RelationGetRelationName(rel)),
917 : errhint("Relation \"%s\" has insufficient replication identity.",
918 : RelationGetRelationName(rel)));
919 :
920 : /*
921 : * Obtain the replica identity index -- either one that has been set
922 : * explicitly, or the primary key. If none of these cases apply, the
923 : * table cannot be repacked concurrently. It might be possible to have
924 : * repack work with a FULL replica identity; however that requires more
925 : * work and is not implemented yet.
926 : */
927 2 : ident_idx = RelationGetReplicaIndex(rel);
928 2 : if (!OidIsValid(ident_idx) && OidIsValid(rel->rd_pkindex))
929 0 : ident_idx = rel->rd_pkindex;
930 2 : if (!OidIsValid(ident_idx))
931 0 : ereport(ERROR,
932 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
933 : errmsg("cannot process relation \"%s\"",
934 : RelationGetRelationName(rel)),
935 : errhint("Relation \"%s\" has no identity index.",
936 : RelationGetRelationName(rel)));
937 :
938 2 : *ident_idx_p = ident_idx;
939 2 : }
940 :
941 :
942 : /*
943 : * rebuild_relation: rebuild an existing relation in index or physical order
944 : *
945 : * OldHeap: table to rebuild. See cluster_rel() for comments on the required
946 : * lock strength.
947 : *
948 : * index: index to cluster by, or NULL to rewrite in physical order.
949 : *
950 : * ident_idx: identity index, to handle replaying of concurrent data changes
951 : * to the new heap. InvalidOid if there's no CONCURRENTLY option.
952 : *
953 : * On entry, heap and index (if one is given) must be open, and the
954 : * appropriate lock held on them -- AccessExclusiveLock for exclusive
955 : * processing and ShareUpdateExclusiveLock for concurrent processing.
956 : *
957 : * On exit, they are closed, but still locked with AccessExclusiveLock.
958 : * (The function handles the lock upgrade if 'concurrent' is true.)
959 : */
960 : static void
961 391 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
962 : Oid ident_idx)
963 : {
964 391 : Oid tableOid = RelationGetRelid(OldHeap);
965 391 : Oid accessMethod = OldHeap->rd_rel->relam;
966 391 : Oid tableSpace = OldHeap->rd_rel->reltablespace;
967 : Oid OIDNewHeap;
968 : Relation NewHeap;
969 : char relpersistence;
970 : bool swap_toast_by_content;
971 : TransactionId frozenXid;
972 : MultiXactId cutoffMulti;
973 391 : bool concurrent = OidIsValid(ident_idx);
974 391 : Snapshot snapshot = NULL;
975 : #if USE_ASSERT_CHECKING
976 : LOCKMODE lmode;
977 :
978 : lmode = RepackLockLevel(concurrent);
979 :
980 : Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
981 : Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
982 : #endif
983 :
984 391 : if (concurrent)
985 : {
986 : /*
987 : * The worker needs to be member of the locking group we're the leader
988 : * of. We ought to become the leader before the worker starts. The
989 : * worker will join the group as soon as it starts.
990 : *
991 : * This is to make sure that the deadlock described below is
992 : * detectable by deadlock.c: if the worker waits for a transaction to
993 : * complete and we are waiting for the worker output, then effectively
994 : * we (i.e. this backend) are waiting for that transaction.
995 : */
996 2 : BecomeLockGroupLeader();
997 :
998 : /*
999 : * Start the worker that decodes data changes applied while we're
1000 : * copying the table contents.
1001 : *
1002 : * Note that the worker has to wait for all transactions with XID
1003 : * already assigned to finish. If some of those transactions is
1004 : * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1005 : * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1006 : * Not sure this risk is worth unlocking/locking the table (and its
1007 : * clustering index) and checking again if it's still eligible for
1008 : * REPACK CONCURRENTLY.
1009 : */
1010 2 : start_repack_decoding_worker(tableOid);
1011 :
1012 : /*
1013 : * Wait until the worker has the initial snapshot and retrieve it.
1014 : */
1015 2 : snapshot = get_initial_snapshot(decoding_worker);
1016 :
1017 2 : PushActiveSnapshot(snapshot);
1018 : }
1019 :
1020 : /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1021 391 : if (index != NULL)
1022 140 : mark_index_clustered(OldHeap, RelationGetRelid(index), true);
1023 :
1024 : /* Remember info about rel before closing OldHeap */
1025 391 : relpersistence = OldHeap->rd_rel->relpersistence;
1026 :
1027 : /*
1028 : * Create the transient table that will receive the re-ordered data.
1029 : *
1030 : * OldHeap is already locked, so no need to lock it again. make_new_heap
1031 : * obtains AccessExclusiveLock on the new heap and its toast table.
1032 : */
1033 391 : OIDNewHeap = make_new_heap(tableOid, tableSpace,
1034 : accessMethod,
1035 : relpersistence,
1036 : NoLock);
1037 : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
1038 391 : NewHeap = table_open(OIDNewHeap, NoLock);
1039 :
1040 : /* Copy the heap data into the new table in the desired order */
1041 391 : copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
1042 : &swap_toast_by_content, &frozenXid, &cutoffMulti);
1043 :
1044 : /* The historic snapshot won't be needed anymore. */
1045 391 : if (snapshot)
1046 : {
1047 2 : PopActiveSnapshot();
1048 2 : UpdateActiveSnapshotCommandId();
1049 : }
1050 :
1051 391 : if (concurrent)
1052 : {
1053 : Assert(!swap_toast_by_content);
1054 :
1055 : /*
1056 : * Close the index, but keep the lock. Both heaps will be closed by
1057 : * the following call.
1058 : */
1059 2 : if (index)
1060 1 : index_close(index, NoLock);
1061 :
1062 2 : rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
1063 : frozenXid, cutoffMulti);
1064 :
1065 2 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1066 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1067 : }
1068 : else
1069 : {
1070 389 : bool is_system_catalog = IsSystemRelation(OldHeap);
1071 :
1072 : /* Close relcache entries, but keep lock until transaction commit */
1073 389 : table_close(OldHeap, NoLock);
1074 389 : if (index)
1075 139 : index_close(index, NoLock);
1076 :
1077 : /*
1078 : * Close the new relation so it can be dropped as soon as the storage
1079 : * is swapped. The relation is not visible to others, so no need to
1080 : * unlock it explicitly.
1081 : */
1082 389 : table_close(NewHeap, NoLock);
1083 :
1084 : /*
1085 : * Swap the physical files of the target and transient tables, then
1086 : * rebuild the target's indexes and throw away the transient table.
1087 : */
1088 389 : finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
1089 : swap_toast_by_content, false, true,
1090 : true, /* reindex */
1091 : frozenXid, cutoffMulti,
1092 : relpersistence);
1093 : }
1094 387 : }
1095 :
1096 :
1097 : /*
1098 : * Create the transient table that will be filled with new data during
1099 : * CLUSTER, ALTER TABLE, and similar operations. The transient table
1100 : * duplicates the logical structure of the OldHeap; but will have the
1101 : * specified physical storage properties NewTableSpace, NewAccessMethod, and
1102 : * relpersistence.
1103 : *
1104 : * After this, the caller should load the new heap with transferred/modified
1105 : * data, then call finish_heap_swap to complete the operation.
1106 : */
1107 : Oid
1108 1563 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
1109 : char relpersistence, LOCKMODE lockmode)
1110 : {
1111 : TupleDesc OldHeapDesc;
1112 : char NewHeapName[NAMEDATALEN];
1113 : Oid OIDNewHeap;
1114 : Oid toastid;
1115 : Relation OldHeap;
1116 : HeapTuple tuple;
1117 : Datum reloptions;
1118 : bool isNull;
1119 : Oid namespaceid;
1120 :
1121 1563 : OldHeap = table_open(OIDOldHeap, lockmode);
1122 1563 : OldHeapDesc = RelationGetDescr(OldHeap);
1123 :
1124 : /*
1125 : * Note that the NewHeap will not receive any of the defaults or
1126 : * constraints associated with the OldHeap; we don't need 'em, and there's
1127 : * no reason to spend cycles inserting them into the catalogs only to
1128 : * delete them.
1129 : */
1130 :
1131 : /*
1132 : * But we do want to use reloptions of the old heap for new heap.
1133 : */
1134 1563 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1135 1563 : if (!HeapTupleIsValid(tuple))
1136 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1137 1563 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1138 : &isNull);
1139 1563 : if (isNull)
1140 1471 : reloptions = (Datum) 0;
1141 :
1142 1563 : if (relpersistence == RELPERSISTENCE_TEMP)
1143 98 : namespaceid = LookupCreationNamespace("pg_temp");
1144 : else
1145 1465 : namespaceid = RelationGetNamespace(OldHeap);
1146 :
1147 : /*
1148 : * Create the new heap, using a temporary name in the same namespace as
1149 : * the existing table. NOTE: there is some risk of collision with user
1150 : * relnames. Working around this seems more trouble than it's worth; in
1151 : * particular, we can't create the new heap in a different namespace from
1152 : * the old, or we will have problems with the TEMP status of temp tables.
1153 : *
1154 : * Note: the new heap is not a shared relation, even if we are rebuilding
1155 : * a shared rel. However, we do make the new heap mapped if the source is
1156 : * mapped. This simplifies swap_relation_files, and is absolutely
1157 : * necessary for rebuilding pg_class, for reasons explained there.
1158 : */
1159 1563 : snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1160 :
1161 1563 : OIDNewHeap = heap_create_with_catalog(NewHeapName,
1162 : namespaceid,
1163 : NewTableSpace,
1164 : InvalidOid,
1165 : InvalidOid,
1166 : InvalidOid,
1167 1563 : OldHeap->rd_rel->relowner,
1168 : NewAccessMethod,
1169 : OldHeapDesc,
1170 : NIL,
1171 : RELKIND_RELATION,
1172 : relpersistence,
1173 : false,
1174 1563 : RelationIsMapped(OldHeap),
1175 : ONCOMMIT_NOOP,
1176 : reloptions,
1177 : false,
1178 : true,
1179 : true,
1180 : OIDOldHeap,
1181 1563 : NULL);
1182 : Assert(OIDNewHeap != InvalidOid);
1183 :
1184 1563 : ReleaseSysCache(tuple);
1185 :
1186 : /*
1187 : * Advance command counter so that the newly-created relation's catalog
1188 : * tuples will be visible to table_open.
1189 : */
1190 1563 : CommandCounterIncrement();
1191 :
1192 : /*
1193 : * If necessary, create a TOAST table for the new relation.
1194 : *
1195 : * If the relation doesn't have a TOAST table already, we can't need one
1196 : * for the new relation. The other way around is possible though: if some
1197 : * wide columns have been dropped, NewHeapCreateToastTable can decide that
1198 : * no TOAST table is needed for the new table.
1199 : *
1200 : * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1201 : * that the TOAST table will be visible for insertion.
1202 : */
1203 1563 : toastid = OldHeap->rd_rel->reltoastrelid;
1204 1563 : if (OidIsValid(toastid))
1205 : {
1206 : /* keep the existing toast table's reloptions, if any */
1207 553 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
1208 553 : if (!HeapTupleIsValid(tuple))
1209 0 : elog(ERROR, "cache lookup failed for relation %u", toastid);
1210 553 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1211 : &isNull);
1212 553 : if (isNull)
1213 553 : reloptions = (Datum) 0;
1214 :
1215 553 : NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1216 :
1217 553 : ReleaseSysCache(tuple);
1218 : }
1219 :
1220 1563 : table_close(OldHeap, NoLock);
1221 :
1222 1563 : return OIDNewHeap;
1223 : }
1224 :
1225 : /*
1226 : * Do the physical copying of table data.
1227 : *
1228 : * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1229 : * iff concurrent processing is required.
1230 : *
1231 : * There are three output parameters:
1232 : * *pSwapToastByContent is set true if toast tables must be swapped by content.
1233 : * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1234 : * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1235 : */
1236 : static void
1237 391 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
1238 : Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1239 : TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
1240 : {
1241 : Relation relRelation;
1242 : HeapTuple reltup;
1243 : Form_pg_class relform;
1244 : TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
1245 : TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY;
1246 : VacuumParams params;
1247 : struct VacuumCutoffs cutoffs;
1248 : bool use_sort;
1249 391 : double num_tuples = 0,
1250 391 : tups_vacuumed = 0,
1251 391 : tups_recently_dead = 0;
1252 : BlockNumber num_pages;
1253 391 : int elevel = verbose ? INFO : DEBUG2;
1254 : PGRUsage ru0;
1255 : char *nspname;
1256 391 : bool concurrent = snapshot != NULL;
1257 : LOCKMODE lmode;
1258 :
1259 391 : lmode = RepackLockLevel(concurrent);
1260 :
1261 391 : pg_rusage_init(&ru0);
1262 :
1263 : /* Store a copy of the namespace name for logging purposes */
1264 391 : nspname = get_namespace_name(RelationGetNamespace(OldHeap));
1265 :
1266 : /*
1267 : * Their tuple descriptors should be exactly alike, but here we only need
1268 : * assume that they have the same number of columns.
1269 : */
1270 391 : oldTupDesc = RelationGetDescr(OldHeap);
1271 391 : newTupDesc = RelationGetDescr(NewHeap);
1272 : Assert(newTupDesc->natts == oldTupDesc->natts);
1273 :
1274 : /*
1275 : * If the OldHeap has a toast table, get lock on the toast table to keep
1276 : * it from being vacuumed. This is needed because autovacuum processes
1277 : * toast tables independently of their main tables, with no lock on the
1278 : * latter. If an autovacuum were to start on the toast table after we
1279 : * compute our OldestXmin below, it would use a later OldestXmin, and then
1280 : * possibly remove as DEAD toast tuples belonging to main tuples we think
1281 : * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1282 : * tuples.
1283 : *
1284 : * We don't need to open the toast relation here, just lock it. The lock
1285 : * will be held till end of transaction.
1286 : */
1287 391 : if (OldHeap->rd_rel->reltoastrelid)
1288 125 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1289 :
1290 : /*
1291 : * If both tables have TOAST tables, perform toast swap by content. It is
1292 : * possible that the old table has a toast table but the new one doesn't,
1293 : * if toastable columns have been dropped. In that case we have to do
1294 : * swap by links. This is okay because swap by content is only essential
1295 : * for system catalogs, and we don't support schema changes for them.
1296 : */
1297 391 : if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1298 125 : !concurrent)
1299 : {
1300 124 : *pSwapToastByContent = true;
1301 :
1302 : /*
1303 : * When doing swap by content, any toast pointers written into NewHeap
1304 : * must use the old toast table's OID, because that's where the toast
1305 : * data will eventually be found. Set this up by setting rd_toastoid.
1306 : * This also tells toast_save_datum() to preserve the toast value
1307 : * OIDs, which we want so as not to invalidate toast pointers in
1308 : * system catalog caches, and to avoid making multiple copies of a
1309 : * single toast value.
1310 : *
1311 : * Note that we must hold NewHeap open until we are done writing data,
1312 : * since the relcache will not guarantee to remember this setting once
1313 : * the relation is closed. Also, this technique depends on the fact
1314 : * that no one will try to read from the NewHeap until after we've
1315 : * finished writing it and swapping the rels --- otherwise they could
1316 : * follow the toast pointers to the wrong place. (It would actually
1317 : * work for values copied over from the old toast table, but not for
1318 : * any values that we toast which were previously not toasted.)
1319 : *
1320 : * This would not work with CONCURRENTLY because we may need to delete
1321 : * TOASTed tuples from the new heap. With this hack, we'd delete them
1322 : * from the old heap.
1323 : */
1324 124 : NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1325 : }
1326 : else
1327 267 : *pSwapToastByContent = false;
1328 :
1329 : /*
1330 : * Compute xids used to freeze and weed out dead tuples and multixacts.
1331 : * Since we're going to rewrite the whole table anyway, there's no reason
1332 : * not to be aggressive about this.
1333 : */
1334 391 : memset(¶ms, 0, sizeof(VacuumParams));
1335 391 : vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs);
1336 :
1337 : /*
1338 : * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1339 : * backwards, so take the max.
1340 : */
1341 : {
1342 391 : TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1343 :
1344 782 : if (TransactionIdIsValid(relfrozenxid) &&
1345 391 : TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
1346 58 : cutoffs.FreezeLimit = relfrozenxid;
1347 : }
1348 :
1349 : /*
1350 : * MultiXactCutoff, similarly, shouldn't go backwards either.
1351 : */
1352 : {
1353 391 : MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1354 :
1355 782 : if (MultiXactIdIsValid(relminmxid) &&
1356 391 : MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
1357 0 : cutoffs.MultiXactCutoff = relminmxid;
1358 : }
1359 :
1360 : /*
1361 : * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1362 : * the OldHeap. We know how to use a sort to duplicate the ordering of a
1363 : * btree index, and will use seqscan-and-sort for that case if the planner
1364 : * tells us it's cheaper. Otherwise, always indexscan if an index is
1365 : * provided, else plain seqscan.
1366 : */
1367 391 : if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1368 140 : use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
1369 : RelationGetRelid(OldIndex));
1370 : else
1371 251 : use_sort = false;
1372 :
1373 : /* Log what we're doing */
1374 391 : if (OldIndex != NULL && !use_sort)
1375 60 : ereport(elevel,
1376 : errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1377 : nspname,
1378 : RelationGetRelationName(OldHeap),
1379 : RelationGetRelationName(OldIndex)));
1380 331 : else if (use_sort)
1381 80 : ereport(elevel,
1382 : errmsg("repacking \"%s.%s\" using sequential scan and sort",
1383 : nspname,
1384 : RelationGetRelationName(OldHeap)));
1385 : else
1386 251 : ereport(elevel,
1387 : errmsg("repacking \"%s.%s\" in physical order",
1388 : nspname,
1389 : RelationGetRelationName(OldHeap)));
1390 :
1391 : /*
1392 : * Hand off the actual copying to AM specific function, the generic code
1393 : * cannot know how to deal with visibility across AMs. Note that this
1394 : * routine is allowed to set FreezeXid / MultiXactCutoff to different
1395 : * values (e.g. because the AM doesn't use freezing).
1396 : */
1397 391 : table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
1398 : cutoffs.OldestXmin, snapshot,
1399 : &cutoffs.FreezeLimit,
1400 : &cutoffs.MultiXactCutoff,
1401 : &num_tuples, &tups_vacuumed,
1402 : &tups_recently_dead);
1403 :
1404 : /* return selected values to caller, get set as relfrozenxid/minmxid */
1405 391 : *pFreezeXid = cutoffs.FreezeLimit;
1406 391 : *pCutoffMulti = cutoffs.MultiXactCutoff;
1407 :
1408 : /*
1409 : * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1410 : * In the CONCURRENTLY case, we need to set it again before applying the
1411 : * concurrent changes.
1412 : */
1413 391 : NewHeap->rd_toastoid = InvalidOid;
1414 :
1415 391 : num_pages = RelationGetNumberOfBlocks(NewHeap);
1416 :
1417 : /* Log what we did */
1418 391 : ereport(elevel,
1419 : (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1420 : nspname,
1421 : RelationGetRelationName(OldHeap),
1422 : tups_vacuumed, num_tuples,
1423 : RelationGetNumberOfBlocks(OldHeap)),
1424 : errdetail("%.0f dead row versions cannot be removed yet.\n"
1425 : "%s.",
1426 : tups_recently_dead,
1427 : pg_rusage_show(&ru0))));
1428 :
1429 : /* Update pg_class to reflect the correct values of pages and tuples. */
1430 391 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1431 :
1432 391 : reltup = SearchSysCacheCopy1(RELOID,
1433 : ObjectIdGetDatum(RelationGetRelid(NewHeap)));
1434 391 : if (!HeapTupleIsValid(reltup))
1435 0 : elog(ERROR, "cache lookup failed for relation %u",
1436 : RelationGetRelid(NewHeap));
1437 391 : relform = (Form_pg_class) GETSTRUCT(reltup);
1438 :
1439 391 : relform->relpages = num_pages;
1440 391 : relform->reltuples = num_tuples;
1441 :
1442 : /* Don't update the stats for pg_class. See swap_relation_files. */
1443 391 : if (RelationGetRelid(OldHeap) != RelationRelationId)
1444 368 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1445 : else
1446 23 : CacheInvalidateRelcacheByTuple(reltup);
1447 :
1448 : /* Clean up. */
1449 391 : heap_freetuple(reltup);
1450 391 : table_close(relRelation, RowExclusiveLock);
1451 :
1452 : /* Make the update visible */
1453 391 : CommandCounterIncrement();
1454 391 : }
1455 :
1456 : /*
1457 : * Swap the physical files of two given relations.
1458 : *
1459 : * We swap the physical identity (reltablespace, relfilenumber) while keeping
1460 : * the same logical identities of the two relations. relpersistence is also
1461 : * swapped, which is critical since it determines where buffers live for each
1462 : * relation.
1463 : *
1464 : * We can swap associated TOAST data in either of two ways: recursively swap
1465 : * the physical content of the toast tables (and their indexes), or swap the
1466 : * TOAST links in the given relations' pg_class entries. The former is needed
1467 : * to manage rewrites of shared catalogs (where we cannot change the pg_class
1468 : * links) while the latter is the only way to handle cases in which a toast
1469 : * table is added or removed altogether.
1470 : *
1471 : * Additionally, the first relation is marked with relfrozenxid set to
1472 : * frozenXid. It seems a bit ugly to have this here, but the caller would
1473 : * have to do it anyway, so having it here saves a heap_update. Note: in
1474 : * the swap-toast-links case, we assume we don't need to change the toast
1475 : * table's relfrozenxid: the new version of the toast table should already
1476 : * have relfrozenxid set to RecentXmin, which is good enough.
1477 : *
1478 : * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1479 : * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1480 : * having to look the information up again later in finish_heap_swap.
1481 : */
1482 : static void
1483 1680 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1484 : bool swap_toast_by_content,
1485 : bool is_internal,
1486 : TransactionId frozenXid,
1487 : MultiXactId cutoffMulti,
1488 : Oid *mapped_tables)
1489 : {
1490 : Relation relRelation;
1491 : HeapTuple reltup1,
1492 : reltup2;
1493 : Form_pg_class relform1,
1494 : relform2;
1495 : RelFileNumber relfilenumber1,
1496 : relfilenumber2;
1497 : RelFileNumber swaptemp;
1498 : char swptmpchr;
1499 : Oid relam1,
1500 : relam2;
1501 :
1502 : /* We need writable copies of both pg_class tuples. */
1503 1680 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1504 :
1505 1680 : reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1506 1680 : if (!HeapTupleIsValid(reltup1))
1507 0 : elog(ERROR, "cache lookup failed for relation %u", r1);
1508 1680 : relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1509 :
1510 1680 : reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1511 1680 : if (!HeapTupleIsValid(reltup2))
1512 0 : elog(ERROR, "cache lookup failed for relation %u", r2);
1513 1680 : relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1514 :
1515 1680 : relfilenumber1 = relform1->relfilenode;
1516 1680 : relfilenumber2 = relform2->relfilenode;
1517 1680 : relam1 = relform1->relam;
1518 1680 : relam2 = relform2->relam;
1519 :
1520 1680 : if (RelFileNumberIsValid(relfilenumber1) &&
1521 : RelFileNumberIsValid(relfilenumber2))
1522 : {
1523 : /*
1524 : * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1525 : * relpersistence
1526 : */
1527 : Assert(!target_is_pg_class);
1528 :
1529 1591 : swaptemp = relform1->relfilenode;
1530 1591 : relform1->relfilenode = relform2->relfilenode;
1531 1591 : relform2->relfilenode = swaptemp;
1532 :
1533 1591 : swaptemp = relform1->reltablespace;
1534 1591 : relform1->reltablespace = relform2->reltablespace;
1535 1591 : relform2->reltablespace = swaptemp;
1536 :
1537 1591 : swaptemp = relform1->relam;
1538 1591 : relform1->relam = relform2->relam;
1539 1591 : relform2->relam = swaptemp;
1540 :
1541 1591 : swptmpchr = relform1->relpersistence;
1542 1591 : relform1->relpersistence = relform2->relpersistence;
1543 1591 : relform2->relpersistence = swptmpchr;
1544 :
1545 : /* Also swap toast links, if we're swapping by links */
1546 1591 : if (!swap_toast_by_content)
1547 : {
1548 1279 : swaptemp = relform1->reltoastrelid;
1549 1279 : relform1->reltoastrelid = relform2->reltoastrelid;
1550 1279 : relform2->reltoastrelid = swaptemp;
1551 : }
1552 : }
1553 : else
1554 : {
1555 : /*
1556 : * Mapped-relation case. Here we have to swap the relation mappings
1557 : * instead of modifying the pg_class columns. Both must be mapped.
1558 : */
1559 89 : if (RelFileNumberIsValid(relfilenumber1) ||
1560 : RelFileNumberIsValid(relfilenumber2))
1561 0 : elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1562 : NameStr(relform1->relname));
1563 :
1564 : /*
1565 : * We can't change the tablespace nor persistence of a mapped rel, and
1566 : * we can't handle toast link swapping for one either, because we must
1567 : * not apply any critical changes to its pg_class row. These cases
1568 : * should be prevented by upstream permissions tests, so these checks
1569 : * are non-user-facing emergency backstop.
1570 : */
1571 89 : if (relform1->reltablespace != relform2->reltablespace)
1572 0 : elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1573 : NameStr(relform1->relname));
1574 89 : if (relform1->relpersistence != relform2->relpersistence)
1575 0 : elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1576 : NameStr(relform1->relname));
1577 89 : if (relform1->relam != relform2->relam)
1578 0 : elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1579 : NameStr(relform1->relname));
1580 89 : if (!swap_toast_by_content &&
1581 29 : (relform1->reltoastrelid || relform2->reltoastrelid))
1582 0 : elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1583 : NameStr(relform1->relname));
1584 :
1585 : /*
1586 : * Fetch the mappings --- shouldn't fail, but be paranoid
1587 : */
1588 89 : relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
1589 89 : if (!RelFileNumberIsValid(relfilenumber1))
1590 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1591 : NameStr(relform1->relname), r1);
1592 89 : relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
1593 89 : if (!RelFileNumberIsValid(relfilenumber2))
1594 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1595 : NameStr(relform2->relname), r2);
1596 :
1597 : /*
1598 : * Send replacement mappings to relmapper. Note these won't actually
1599 : * take effect until CommandCounterIncrement.
1600 : */
1601 89 : RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1602 89 : RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1603 :
1604 : /* Pass OIDs of mapped r2 tables back to caller */
1605 89 : *mapped_tables++ = r2;
1606 : }
1607 :
1608 : /*
1609 : * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1610 : * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1611 : * new.
1612 : */
1613 : {
1614 : Relation rel1,
1615 : rel2;
1616 :
1617 1680 : rel1 = relation_open(r1, NoLock);
1618 1680 : rel2 = relation_open(r2, NoLock);
1619 1680 : rel2->rd_createSubid = rel1->rd_createSubid;
1620 1680 : rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1621 1680 : rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1622 1680 : RelationAssumeNewRelfilelocator(rel1);
1623 1680 : relation_close(rel1, NoLock);
1624 1680 : relation_close(rel2, NoLock);
1625 : }
1626 :
1627 : /*
1628 : * In the case of a shared catalog, these next few steps will only affect
1629 : * our own database's pg_class row; but that's okay, because they are all
1630 : * noncritical updates. That's also an important fact for the case of a
1631 : * mapped catalog, because it's possible that we'll commit the map change
1632 : * and then fail to commit the pg_class update.
1633 : */
1634 :
1635 : /* set rel1's frozen Xid and minimum MultiXid */
1636 1680 : if (relform1->relkind != RELKIND_INDEX)
1637 : {
1638 : Assert(!TransactionIdIsValid(frozenXid) ||
1639 : TransactionIdIsNormal(frozenXid));
1640 1554 : relform1->relfrozenxid = frozenXid;
1641 1554 : relform1->relminmxid = cutoffMulti;
1642 : }
1643 :
1644 : /* swap size statistics too, since new rel has freshly-updated stats */
1645 : {
1646 : int32 swap_pages;
1647 : float4 swap_tuples;
1648 : int32 swap_allvisible;
1649 : int32 swap_allfrozen;
1650 :
1651 1680 : swap_pages = relform1->relpages;
1652 1680 : relform1->relpages = relform2->relpages;
1653 1680 : relform2->relpages = swap_pages;
1654 :
1655 1680 : swap_tuples = relform1->reltuples;
1656 1680 : relform1->reltuples = relform2->reltuples;
1657 1680 : relform2->reltuples = swap_tuples;
1658 :
1659 1680 : swap_allvisible = relform1->relallvisible;
1660 1680 : relform1->relallvisible = relform2->relallvisible;
1661 1680 : relform2->relallvisible = swap_allvisible;
1662 :
1663 1680 : swap_allfrozen = relform1->relallfrozen;
1664 1680 : relform1->relallfrozen = relform2->relallfrozen;
1665 1680 : relform2->relallfrozen = swap_allfrozen;
1666 : }
1667 :
1668 : /*
1669 : * Update the tuples in pg_class --- unless the target relation of the
1670 : * swap is pg_class itself. In that case, there is zero point in making
1671 : * changes because we'd be updating the old data that we're about to throw
1672 : * away. Because the real work being done here for a mapped relation is
1673 : * just to change the relation map settings, it's all right to not update
1674 : * the pg_class rows in this case. The most important changes will instead
1675 : * performed later, in finish_heap_swap() itself.
1676 : */
1677 1680 : if (!target_is_pg_class)
1678 : {
1679 : CatalogIndexState indstate;
1680 :
1681 1657 : indstate = CatalogOpenIndexes(relRelation);
1682 1657 : CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
1683 : indstate);
1684 1657 : CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
1685 : indstate);
1686 1657 : CatalogCloseIndexes(indstate);
1687 : }
1688 : else
1689 : {
1690 : /* no update ... but we do still need relcache inval */
1691 23 : CacheInvalidateRelcacheByTuple(reltup1);
1692 23 : CacheInvalidateRelcacheByTuple(reltup2);
1693 : }
1694 :
1695 : /*
1696 : * Now that pg_class has been updated with its relevant information for
1697 : * the swap, update the dependency of the relations to point to their new
1698 : * table AM, if it has changed.
1699 : */
1700 1680 : if (relam1 != relam2)
1701 : {
1702 24 : if (changeDependencyFor(RelationRelationId,
1703 : r1,
1704 : AccessMethodRelationId,
1705 : relam1,
1706 : relam2) != 1)
1707 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1708 : get_namespace_name(get_rel_namespace(r1)),
1709 : get_rel_name(r1));
1710 24 : if (changeDependencyFor(RelationRelationId,
1711 : r2,
1712 : AccessMethodRelationId,
1713 : relam2,
1714 : relam1) != 1)
1715 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1716 : get_namespace_name(get_rel_namespace(r2)),
1717 : get_rel_name(r2));
1718 : }
1719 :
1720 : /*
1721 : * Post alter hook for modified relations. The change to r2 is always
1722 : * internal, but r1 depends on the invocation context.
1723 : */
1724 1680 : InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
1725 : InvalidOid, is_internal);
1726 1680 : InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
1727 : InvalidOid, true);
1728 :
1729 : /*
1730 : * If we have toast tables associated with the relations being swapped,
1731 : * deal with them too.
1732 : */
1733 1680 : if (relform1->reltoastrelid || relform2->reltoastrelid)
1734 : {
1735 524 : if (swap_toast_by_content)
1736 : {
1737 124 : if (relform1->reltoastrelid && relform2->reltoastrelid)
1738 : {
1739 : /* Recursively swap the contents of the toast tables */
1740 124 : swap_relation_files(relform1->reltoastrelid,
1741 : relform2->reltoastrelid,
1742 : target_is_pg_class,
1743 : swap_toast_by_content,
1744 : is_internal,
1745 : frozenXid,
1746 : cutoffMulti,
1747 : mapped_tables);
1748 : }
1749 : else
1750 : {
1751 : /* caller messed up */
1752 0 : elog(ERROR, "cannot swap toast files by content when there's only one");
1753 : }
1754 : }
1755 : else
1756 : {
1757 : /*
1758 : * We swapped the ownership links, so we need to change dependency
1759 : * data to match.
1760 : *
1761 : * NOTE: it is possible that only one table has a toast table.
1762 : *
1763 : * NOTE: at present, a TOAST table's only dependency is the one on
1764 : * its owning table. If more are ever created, we'd need to use
1765 : * something more selective than deleteDependencyRecordsFor() to
1766 : * get rid of just the link we want.
1767 : */
1768 : ObjectAddress baseobject,
1769 : toastobject;
1770 : long count;
1771 :
1772 : /*
1773 : * We disallow this case for system catalogs, to avoid the
1774 : * possibility that the catalog we're rebuilding is one of the
1775 : * ones the dependency changes would change. It's too late to be
1776 : * making any data changes to the target catalog.
1777 : */
1778 400 : if (IsSystemClass(r1, relform1))
1779 0 : elog(ERROR, "cannot swap toast files by links for system catalogs");
1780 :
1781 : /* Delete old dependencies */
1782 400 : if (relform1->reltoastrelid)
1783 : {
1784 379 : count = deleteDependencyRecordsFor(RelationRelationId,
1785 : relform1->reltoastrelid,
1786 : false);
1787 379 : if (count != 1)
1788 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1789 : count);
1790 : }
1791 400 : if (relform2->reltoastrelid)
1792 : {
1793 400 : count = deleteDependencyRecordsFor(RelationRelationId,
1794 : relform2->reltoastrelid,
1795 : false);
1796 400 : if (count != 1)
1797 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1798 : count);
1799 : }
1800 :
1801 : /* Register new dependencies */
1802 400 : baseobject.classId = RelationRelationId;
1803 400 : baseobject.objectSubId = 0;
1804 400 : toastobject.classId = RelationRelationId;
1805 400 : toastobject.objectSubId = 0;
1806 :
1807 400 : if (relform1->reltoastrelid)
1808 : {
1809 379 : baseobject.objectId = r1;
1810 379 : toastobject.objectId = relform1->reltoastrelid;
1811 379 : recordDependencyOn(&toastobject, &baseobject,
1812 : DEPENDENCY_INTERNAL);
1813 : }
1814 :
1815 400 : if (relform2->reltoastrelid)
1816 : {
1817 400 : baseobject.objectId = r2;
1818 400 : toastobject.objectId = relform2->reltoastrelid;
1819 400 : recordDependencyOn(&toastobject, &baseobject,
1820 : DEPENDENCY_INTERNAL);
1821 : }
1822 : }
1823 : }
1824 :
1825 : /*
1826 : * If we're swapping two toast tables by content, do the same for their
1827 : * valid index. The swap can actually be safely done only if the relations
1828 : * have indexes.
1829 : */
1830 1680 : if (swap_toast_by_content &&
1831 372 : relform1->relkind == RELKIND_TOASTVALUE &&
1832 124 : relform2->relkind == RELKIND_TOASTVALUE)
1833 : {
1834 : Oid toastIndex1,
1835 : toastIndex2;
1836 :
1837 : /* Get valid index for each relation */
1838 124 : toastIndex1 = toast_get_valid_index(r1,
1839 : AccessExclusiveLock);
1840 124 : toastIndex2 = toast_get_valid_index(r2,
1841 : AccessExclusiveLock);
1842 :
1843 124 : swap_relation_files(toastIndex1,
1844 : toastIndex2,
1845 : target_is_pg_class,
1846 : swap_toast_by_content,
1847 : is_internal,
1848 : InvalidTransactionId,
1849 : InvalidMultiXactId,
1850 : mapped_tables);
1851 : }
1852 :
1853 : /* Clean up. */
1854 1680 : heap_freetuple(reltup1);
1855 1680 : heap_freetuple(reltup2);
1856 :
1857 1680 : table_close(relRelation, RowExclusiveLock);
1858 1680 : }
1859 :
1860 : /*
1861 : * Remove the transient table that was built by make_new_heap, and finish
1862 : * cleaning up (including rebuilding all indexes on the old heap).
1863 : */
1864 : void
1865 1430 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1866 : bool is_system_catalog,
1867 : bool swap_toast_by_content,
1868 : bool check_constraints,
1869 : bool is_internal,
1870 : bool reindex,
1871 : TransactionId frozenXid,
1872 : MultiXactId cutoffMulti,
1873 : char newrelpersistence)
1874 : {
1875 : ObjectAddress object;
1876 : Oid mapped_tables[4];
1877 : int i;
1878 :
1879 : /* Report that we are now swapping relation files */
1880 1430 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1881 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
1882 :
1883 : /* Zero out possible results from swapped_relation_files */
1884 1430 : memset(mapped_tables, 0, sizeof(mapped_tables));
1885 :
1886 : /*
1887 : * Swap the contents of the heap relations (including any toast tables).
1888 : * Also set old heap's relfrozenxid to frozenXid.
1889 : */
1890 1430 : swap_relation_files(OIDOldHeap, OIDNewHeap,
1891 : (OIDOldHeap == RelationRelationId),
1892 : swap_toast_by_content, is_internal,
1893 : frozenXid, cutoffMulti, mapped_tables);
1894 :
1895 : /*
1896 : * If it's a system catalog, queue a sinval message to flush all catcaches
1897 : * on the catalog when we reach CommandCounterIncrement.
1898 : */
1899 1430 : if (is_system_catalog)
1900 121 : CacheInvalidateCatalog(OIDOldHeap);
1901 :
1902 1430 : if (reindex)
1903 : {
1904 : int reindex_flags;
1905 1428 : ReindexParams reindex_params = {0};
1906 :
1907 : /*
1908 : * Rebuild each index on the relation (but not the toast table, which
1909 : * is all-new at this point). It is important to do this before the
1910 : * DROP step because if we are processing a system catalog that will
1911 : * be used during DROP, we want to have its indexes available. There
1912 : * is no advantage to the other order anyway because this is all
1913 : * transactional, so no chance to reclaim disk space before commit. We
1914 : * do not need a final CommandCounterIncrement() because
1915 : * reindex_relation does it.
1916 : *
1917 : * Note: because index_build is called via reindex_relation, it will
1918 : * never set indcheckxmin true for the indexes. This is OK even
1919 : * though in some sense we are building new indexes rather than
1920 : * rebuilding existing ones, because the new heap won't contain any
1921 : * HOT chains at all, let alone broken ones, so it can't be necessary
1922 : * to set indcheckxmin.
1923 : */
1924 1428 : reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1925 1428 : if (check_constraints)
1926 1039 : reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1927 :
1928 : /*
1929 : * Ensure that the indexes have the same persistence as the parent
1930 : * relation.
1931 : */
1932 1428 : if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1933 25 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
1934 1403 : else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1935 1350 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
1936 :
1937 : /* Report that we are now reindexing relations */
1938 1428 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1939 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
1940 :
1941 1428 : reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
1942 : }
1943 :
1944 : /* Report that we are now doing clean up */
1945 1418 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1946 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1947 :
1948 : /*
1949 : * If the relation being rebuilt is pg_class, swap_relation_files()
1950 : * couldn't update pg_class's own pg_class entry (check comments in
1951 : * swap_relation_files()), thus relfrozenxid was not updated. That's
1952 : * annoying because a potential reason for doing a VACUUM FULL is a
1953 : * imminent or actual anti-wraparound shutdown. So, now that we can
1954 : * access the new relation using its indices, update relfrozenxid.
1955 : * pg_class doesn't have a toast relation, so we don't need to update the
1956 : * corresponding toast relation. Not that there's little point moving all
1957 : * relfrozenxid updates here since swap_relation_files() needs to write to
1958 : * pg_class for non-mapped relations anyway.
1959 : */
1960 1418 : if (OIDOldHeap == RelationRelationId)
1961 : {
1962 : Relation relRelation;
1963 : HeapTuple reltup;
1964 : Form_pg_class relform;
1965 :
1966 23 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1967 :
1968 23 : reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1969 23 : if (!HeapTupleIsValid(reltup))
1970 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1971 23 : relform = (Form_pg_class) GETSTRUCT(reltup);
1972 :
1973 23 : relform->relfrozenxid = frozenXid;
1974 23 : relform->relminmxid = cutoffMulti;
1975 :
1976 23 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1977 :
1978 23 : table_close(relRelation, RowExclusiveLock);
1979 : }
1980 :
1981 : /* Destroy new heap with old filenumber */
1982 1418 : object.classId = RelationRelationId;
1983 1418 : object.objectId = OIDNewHeap;
1984 1418 : object.objectSubId = 0;
1985 :
1986 1418 : if (!reindex)
1987 : {
1988 : /*
1989 : * Make sure the changes in pg_class are visible. This is especially
1990 : * important if !swap_toast_by_content, so that the correct TOAST
1991 : * relation is dropped. (reindex_relation() above did not help in this
1992 : * case))
1993 : */
1994 2 : CommandCounterIncrement();
1995 : }
1996 :
1997 : /*
1998 : * The new relation is local to our transaction and we know nothing
1999 : * depends on it, so DROP_RESTRICT should be OK.
2000 : */
2001 1418 : performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
2002 :
2003 : /* performDeletion does CommandCounterIncrement at end */
2004 :
2005 : /*
2006 : * Now we must remove any relation mapping entries that we set up for the
2007 : * transient table, as well as its toast table and toast index if any. If
2008 : * we fail to do this before commit, the relmapper will complain about new
2009 : * permanent map entries being added post-bootstrap.
2010 : */
2011 1507 : for (i = 0; OidIsValid(mapped_tables[i]); i++)
2012 89 : RelationMapRemoveMapping(mapped_tables[i]);
2013 :
2014 : /*
2015 : * At this point, everything is kosher except that, if we did toast swap
2016 : * by links, the toast table's name corresponds to the transient table.
2017 : * The name is irrelevant to the backend because it's referenced by OID,
2018 : * but users looking at the catalogs could be confused. Rename it to
2019 : * prevent this problem.
2020 : *
2021 : * Note no lock required on the relation, because we already hold an
2022 : * exclusive lock on it.
2023 : */
2024 1418 : if (!swap_toast_by_content)
2025 : {
2026 : Relation newrel;
2027 :
2028 1294 : newrel = table_open(OIDOldHeap, NoLock);
2029 1294 : if (OidIsValid(newrel->rd_rel->reltoastrelid))
2030 : {
2031 : Oid toastidx;
2032 : char NewToastName[NAMEDATALEN];
2033 :
2034 : /* Get the associated valid index to be renamed */
2035 379 : toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2036 : AccessExclusiveLock);
2037 :
2038 : /* rename the toast table ... */
2039 379 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2040 : OIDOldHeap);
2041 379 : RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2042 : NewToastName, true, false);
2043 :
2044 : /* ... and its valid index too. */
2045 379 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2046 : OIDOldHeap);
2047 :
2048 379 : RenameRelationInternal(toastidx,
2049 : NewToastName, true, true);
2050 :
2051 : /*
2052 : * Reset the relrewrite for the toast. The command-counter
2053 : * increment is required here as we are about to update the tuple
2054 : * that is updated as part of RenameRelationInternal.
2055 : */
2056 379 : CommandCounterIncrement();
2057 379 : ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2058 : }
2059 1294 : relation_close(newrel, NoLock);
2060 : }
2061 :
2062 : /* if it's not a catalog table, clear any missing attribute settings */
2063 1418 : if (!is_system_catalog)
2064 : {
2065 : Relation newrel;
2066 :
2067 1297 : newrel = table_open(OIDOldHeap, NoLock);
2068 1297 : RelationClearMissing(newrel);
2069 1297 : relation_close(newrel, NoLock);
2070 : }
2071 1418 : }
2072 :
2073 : /*
2074 : * Determine which relations to process, when REPACK/CLUSTER is called
2075 : * without specifying a table name. The exact process depends on whether
2076 : * USING INDEX was given or not, and in any case we only return tables and
2077 : * materialized views that the current user has privileges to repack/cluster.
2078 : *
2079 : * If USING INDEX was given, we scan pg_index to find those that have
2080 : * indisclustered set; if it was not given, scan pg_class and return all
2081 : * tables.
2082 : *
2083 : * Return it as a list of RelToCluster in the given memory context.
2084 : */
2085 : static List *
2086 18 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
2087 : {
2088 : Relation catalog;
2089 : TableScanDesc scan;
2090 : HeapTuple tuple;
2091 18 : List *rtcs = NIL;
2092 :
2093 18 : if (usingindex)
2094 : {
2095 : ScanKeyData entry;
2096 :
2097 : /*
2098 : * For USING INDEX, scan pg_index to find those with indisclustered.
2099 : */
2100 14 : catalog = table_open(IndexRelationId, AccessShareLock);
2101 14 : ScanKeyInit(&entry,
2102 : Anum_pg_index_indisclustered,
2103 : BTEqualStrategyNumber, F_BOOLEQ,
2104 : BoolGetDatum(true));
2105 14 : scan = table_beginscan_catalog(catalog, 1, &entry);
2106 26 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2107 : {
2108 : RelToCluster *rtc;
2109 : Form_pg_index index;
2110 : MemoryContext oldcxt;
2111 :
2112 12 : index = (Form_pg_index) GETSTRUCT(tuple);
2113 :
2114 : /*
2115 : * Try to obtain a light lock on the index's table, to ensure it
2116 : * doesn't go away while we collect the list. If we cannot, just
2117 : * disregard it. Be sure to release this if we ultimately decide
2118 : * not to process the table!
2119 : */
2120 12 : if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
2121 0 : continue;
2122 :
2123 : /* Verify that the table still exists; skip if not */
2124 12 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(index->indrelid)))
2125 : {
2126 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2127 0 : continue;
2128 : }
2129 :
2130 : /* noisily skip rels which the user can't process */
2131 12 : if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2132 : GetUserId()))
2133 : {
2134 8 : UnlockRelationOid(index->indrelid, AccessShareLock);
2135 8 : continue;
2136 : }
2137 :
2138 : /* Use a permanent memory context for the result list */
2139 4 : oldcxt = MemoryContextSwitchTo(permcxt);
2140 4 : rtc = palloc_object(RelToCluster);
2141 4 : rtc->tableOid = index->indrelid;
2142 4 : rtc->indexOid = index->indexrelid;
2143 4 : rtcs = lappend(rtcs, rtc);
2144 4 : MemoryContextSwitchTo(oldcxt);
2145 : }
2146 : }
2147 : else
2148 : {
2149 4 : catalog = table_open(RelationRelationId, AccessShareLock);
2150 4 : scan = table_beginscan_catalog(catalog, 0, NULL);
2151 :
2152 6295 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2153 : {
2154 : RelToCluster *rtc;
2155 : Form_pg_class class;
2156 : MemoryContext oldcxt;
2157 :
2158 6291 : class = (Form_pg_class) GETSTRUCT(tuple);
2159 :
2160 : /*
2161 : * Try to obtain a light lock on the table, to ensure it doesn't
2162 : * go away while we collect the list. If we cannot, just
2163 : * disregard the table. Be sure to release this if we ultimately
2164 : * decide not to process the table!
2165 : */
2166 6291 : if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
2167 0 : continue;
2168 :
2169 : /* Verify that the table still exists */
2170 6291 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
2171 : {
2172 17 : UnlockRelationOid(class->oid, AccessShareLock);
2173 17 : continue;
2174 : }
2175 :
2176 : /* Can only process plain tables and matviews */
2177 6274 : if (class->relkind != RELKIND_RELATION &&
2178 4195 : class->relkind != RELKIND_MATVIEW)
2179 : {
2180 4167 : UnlockRelationOid(class->oid, AccessShareLock);
2181 4167 : continue;
2182 : }
2183 :
2184 : /* noisily skip rels which the user can't process */
2185 2107 : if (!repack_is_permitted_for_relation(cmd, class->oid,
2186 : GetUserId()))
2187 : {
2188 2099 : UnlockRelationOid(class->oid, AccessShareLock);
2189 2099 : continue;
2190 : }
2191 :
2192 : /* Use a permanent memory context for the result list */
2193 8 : oldcxt = MemoryContextSwitchTo(permcxt);
2194 8 : rtc = palloc_object(RelToCluster);
2195 8 : rtc->tableOid = class->oid;
2196 8 : rtc->indexOid = InvalidOid;
2197 8 : rtcs = lappend(rtcs, rtc);
2198 8 : MemoryContextSwitchTo(oldcxt);
2199 : }
2200 : }
2201 :
2202 18 : table_endscan(scan);
2203 18 : relation_close(catalog, AccessShareLock);
2204 :
2205 18 : return rtcs;
2206 : }
2207 :
2208 : /*
2209 : * Given a partitioned table or its index, return a list of RelToCluster for
2210 : * all the leaf child tables/indexes.
2211 : *
2212 : * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2213 : * owning relation.
2214 : */
2215 : static List *
2216 20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
2217 : bool rel_is_index, MemoryContext permcxt)
2218 : {
2219 : List *inhoids;
2220 20 : List *rtcs = NIL;
2221 :
2222 : /*
2223 : * Do not lock the children until they're processed. Note that we do hold
2224 : * a lock on the parent partitioned table.
2225 : */
2226 20 : inhoids = find_all_inheritors(relid, NoLock, NULL);
2227 148 : foreach_oid(child_oid, inhoids)
2228 : {
2229 : Oid table_oid,
2230 : index_oid;
2231 : RelToCluster *rtc;
2232 : MemoryContext oldcxt;
2233 :
2234 108 : if (rel_is_index)
2235 : {
2236 : /* consider only leaf indexes */
2237 80 : if (get_rel_relkind(child_oid) != RELKIND_INDEX)
2238 40 : continue;
2239 :
2240 40 : table_oid = IndexGetRelation(child_oid, false);
2241 40 : index_oid = child_oid;
2242 : }
2243 : else
2244 : {
2245 : /* consider only leaf relations */
2246 28 : if (get_rel_relkind(child_oid) != RELKIND_RELATION)
2247 16 : continue;
2248 :
2249 12 : table_oid = child_oid;
2250 12 : index_oid = InvalidOid;
2251 : }
2252 :
2253 : /*
2254 : * It's possible that the user does not have privileges to CLUSTER the
2255 : * leaf partition despite having them on the partitioned table. Skip
2256 : * if so.
2257 : */
2258 52 : if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
2259 12 : continue;
2260 :
2261 : /* Use a permanent memory context for the result list */
2262 40 : oldcxt = MemoryContextSwitchTo(permcxt);
2263 40 : rtc = palloc_object(RelToCluster);
2264 40 : rtc->tableOid = table_oid;
2265 40 : rtc->indexOid = index_oid;
2266 40 : rtcs = lappend(rtcs, rtc);
2267 40 : MemoryContextSwitchTo(oldcxt);
2268 : }
2269 :
2270 20 : return rtcs;
2271 : }
2272 :
2273 :
2274 : /*
2275 : * Return whether userid has privileges to REPACK relid. If not, this
2276 : * function emits a WARNING.
2277 : */
2278 : static bool
2279 2223 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
2280 : {
2281 : Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
2282 :
2283 2223 : if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2284 104 : return true;
2285 :
2286 2119 : ereport(WARNING,
2287 : errmsg("permission denied to execute %s on \"%s\", skipping it",
2288 : RepackCommandAsString(cmd),
2289 : get_rel_name(relid)));
2290 :
2291 2119 : return false;
2292 : }
2293 :
2294 :
2295 : /*
2296 : * Given a RepackStmt with an indicated relation name, resolve the relation
2297 : * name, obtain lock on it, then determine what to do based on the relation
2298 : * type: if it's table and not partitioned, repack it as indicated (using an
2299 : * existing clustered index, or following the given one), and return NULL.
2300 : *
2301 : * On the other hand, if the table is partitioned, do nothing further and
2302 : * instead return the opened and locked relcache entry, so that caller can
2303 : * process the partitions using the multiple-table handling code. In this
2304 : * case, if an index name is given, it's up to the caller to resolve it.
2305 : */
2306 : static Relation
2307 173 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
2308 : ClusterParams *params)
2309 : {
2310 : Relation rel;
2311 : Oid tableOid;
2312 :
2313 : Assert(stmt->relation != NULL);
2314 : Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2315 : stmt->command == REPACK_COMMAND_REPACK);
2316 :
2317 : /*
2318 : * Make sure ANALYZE is specified if a column list is present.
2319 : */
2320 173 : if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2321 4 : ereport(ERROR,
2322 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2323 : errmsg("ANALYZE option must be specified when a column list is provided"));
2324 :
2325 : /* Find, lock, and check permissions on the table. */
2326 169 : tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2327 : lockmode,
2328 : 0,
2329 : RangeVarCallbackMaintainsTable,
2330 : NULL);
2331 161 : rel = table_open(tableOid, NoLock);
2332 :
2333 : /*
2334 : * Reject clustering a remote temp table ... their local buffer manager is
2335 : * not going to cope.
2336 : */
2337 161 : if (RELATION_IS_OTHER_TEMP(rel))
2338 0 : ereport(ERROR,
2339 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2340 : /*- translator: first %s is name of a SQL command, eg. REPACK */
2341 : errmsg("cannot execute %s on temporary tables of other sessions",
2342 : RepackCommandAsString(stmt->command)));
2343 :
2344 : /*
2345 : * For partitioned tables, let caller handle this. Otherwise, process it
2346 : * here and we're done.
2347 : */
2348 161 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2349 36 : return rel;
2350 : else
2351 : {
2352 125 : Oid indexOid = InvalidOid;
2353 :
2354 125 : indexOid = determine_clustered_index(rel, stmt->usingindex,
2355 125 : stmt->indexname);
2356 121 : if (OidIsValid(indexOid))
2357 108 : check_index_is_clusterable(rel, indexOid, lockmode);
2358 :
2359 121 : cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2360 :
2361 : /*
2362 : * Do an analyze, if requested. We close the transaction and start a
2363 : * new one, so that we don't hold the stronger lock for longer than
2364 : * needed.
2365 : */
2366 117 : if (params->options & CLUOPT_ANALYZE)
2367 : {
2368 8 : VacuumParams vac_params = {0};
2369 :
2370 8 : PopActiveSnapshot();
2371 8 : CommitTransactionCommand();
2372 :
2373 8 : StartTransactionCommand();
2374 8 : PushActiveSnapshot(GetTransactionSnapshot());
2375 :
2376 8 : vac_params.options |= VACOPT_ANALYZE;
2377 8 : if (params->options & CLUOPT_VERBOSE)
2378 0 : vac_params.options |= VACOPT_VERBOSE;
2379 8 : analyze_rel(tableOid, NULL, &vac_params,
2380 8 : stmt->relation->va_cols, true, NULL);
2381 8 : PopActiveSnapshot();
2382 8 : CommandCounterIncrement();
2383 : }
2384 :
2385 117 : return NULL;
2386 : }
2387 : }
2388 :
2389 : /*
2390 : * Given a relation and the usingindex/indexname options in a
2391 : * REPACK USING INDEX or CLUSTER command, return the OID of the
2392 : * index to use for clustering the table.
2393 : *
2394 : * Caller must hold lock on the relation so that the set of indexes
2395 : * doesn't change, and must call check_index_is_clusterable.
2396 : */
2397 : static Oid
2398 145 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2399 : {
2400 : Oid indexOid;
2401 :
2402 145 : if (indexname == NULL && usingindex)
2403 : {
2404 : /*
2405 : * If USING INDEX with no name is given, find a clustered index, or
2406 : * error out if none.
2407 : */
2408 19 : indexOid = InvalidOid;
2409 42 : foreach_oid(idxoid, RelationGetIndexList(rel))
2410 : {
2411 19 : if (get_index_isclustered(idxoid))
2412 : {
2413 15 : indexOid = idxoid;
2414 15 : break;
2415 : }
2416 : }
2417 :
2418 19 : if (!OidIsValid(indexOid))
2419 4 : ereport(ERROR,
2420 : errcode(ERRCODE_UNDEFINED_OBJECT),
2421 : errmsg("there is no previously clustered index for table \"%s\"",
2422 : RelationGetRelationName(rel)));
2423 : }
2424 126 : else if (indexname != NULL)
2425 : {
2426 : /* An index was specified; obtain its OID. */
2427 113 : indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2428 113 : if (!OidIsValid(indexOid))
2429 0 : ereport(ERROR,
2430 : errcode(ERRCODE_UNDEFINED_OBJECT),
2431 : errmsg("index \"%s\" for table \"%s\" does not exist",
2432 : indexname, RelationGetRelationName(rel)));
2433 : }
2434 : else
2435 13 : indexOid = InvalidOid;
2436 :
2437 141 : return indexOid;
2438 : }
2439 :
2440 : static const char *
2441 2564 : RepackCommandAsString(RepackCommand cmd)
2442 : {
2443 2564 : switch (cmd)
2444 : {
2445 2165 : case REPACK_COMMAND_REPACK:
2446 2165 : return "REPACK";
2447 222 : case REPACK_COMMAND_VACUUMFULL:
2448 222 : return "VACUUM";
2449 177 : case REPACK_COMMAND_CLUSTER:
2450 177 : return "CLUSTER";
2451 : }
2452 0 : return "???"; /* keep compiler quiet */
2453 : }
2454 :
2455 : /*
2456 : * Apply all the changes stored in 'file'.
2457 : */
2458 : static void
2459 4 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
2460 : {
2461 4 : ConcurrentChangeKind kind = '\0';
2462 4 : Relation rel = chgcxt->cc_rel;
2463 : TupleTableSlot *spilled_tuple;
2464 : TupleTableSlot *old_update_tuple;
2465 : TupleTableSlot *ondisk_tuple;
2466 4 : bool have_old_tuple = false;
2467 : MemoryContext oldcxt;
2468 :
2469 4 : spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2470 : &TTSOpsVirtual);
2471 4 : ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2472 : table_slot_callbacks(rel));
2473 4 : old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2474 : &TTSOpsVirtual);
2475 :
2476 4 : oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
2477 :
2478 : while (true)
2479 27 : {
2480 : size_t nread;
2481 31 : ConcurrentChangeKind prevkind = kind;
2482 :
2483 31 : CHECK_FOR_INTERRUPTS();
2484 :
2485 31 : nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2486 31 : if (nread == 0) /* done with the file? */
2487 4 : break;
2488 :
2489 : /*
2490 : * If this is the old tuple for an update, read it into the tuple slot
2491 : * and go to the next one. The update itself will be executed on the
2492 : * next iteration, when we receive the NEW tuple.
2493 : */
2494 27 : if (kind == CHANGE_UPDATE_OLD)
2495 : {
2496 7 : restore_tuple(file, rel, old_update_tuple);
2497 7 : have_old_tuple = true;
2498 7 : continue;
2499 : }
2500 :
2501 : /*
2502 : * Just before an UPDATE or DELETE, we must update the command
2503 : * counter, because the change could refer to a tuple that we have
2504 : * just inserted; and before an INSERT, we have to do this also if the
2505 : * previous command was either update or delete.
2506 : *
2507 : * With this approach we don't spend so many CCIs for long strings of
2508 : * only INSERTs, which can't affect one another.
2509 : */
2510 20 : if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2511 7 : (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
2512 : prevkind == CHANGE_DELETE)))
2513 : {
2514 17 : CommandCounterIncrement();
2515 17 : UpdateActiveSnapshotCommandId();
2516 : }
2517 :
2518 : /*
2519 : * Now restore the tuple into the slot and execute the change.
2520 : */
2521 20 : restore_tuple(file, rel, spilled_tuple);
2522 :
2523 20 : if (kind == CHANGE_INSERT)
2524 : {
2525 7 : apply_concurrent_insert(rel, spilled_tuple, chgcxt);
2526 : }
2527 13 : else if (kind == CHANGE_DELETE)
2528 : {
2529 : bool found;
2530 :
2531 : /* Find the tuple to be deleted */
2532 3 : found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
2533 3 : if (!found)
2534 0 : elog(ERROR, "failed to find target tuple");
2535 3 : apply_concurrent_delete(rel, ondisk_tuple);
2536 : }
2537 10 : else if (kind == CHANGE_UPDATE_NEW)
2538 : {
2539 : TupleTableSlot *key;
2540 : bool found;
2541 :
2542 10 : if (have_old_tuple)
2543 7 : key = old_update_tuple;
2544 : else
2545 3 : key = spilled_tuple;
2546 :
2547 : /* Find the tuple to be updated or deleted. */
2548 10 : found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2549 10 : if (!found)
2550 0 : elog(ERROR, "failed to find target tuple");
2551 :
2552 : /*
2553 : * If 'tup' contains TOAST pointers, they point to the old
2554 : * relation's toast. Copy the corresponding TOAST pointers for the
2555 : * new relation from the existing tuple. (The fact that we
2556 : * received a TOAST pointer here implies that the attribute hasn't
2557 : * changed.)
2558 : */
2559 10 : adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
2560 :
2561 10 : apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
2562 :
2563 10 : ExecClearTuple(old_update_tuple);
2564 10 : have_old_tuple = false;
2565 : }
2566 : else
2567 0 : elog(ERROR, "unrecognized kind of change: %d", kind);
2568 :
2569 20 : ResetPerTupleExprContext(chgcxt->cc_estate);
2570 : }
2571 :
2572 : /* Cleanup. */
2573 4 : ExecDropSingleTupleTableSlot(spilled_tuple);
2574 4 : ExecDropSingleTupleTableSlot(ondisk_tuple);
2575 4 : ExecDropSingleTupleTableSlot(old_update_tuple);
2576 :
2577 4 : MemoryContextSwitchTo(oldcxt);
2578 4 : }
2579 :
2580 : /*
2581 : * Apply an insert from the spill of concurrent changes to the new copy of the
2582 : * table.
2583 : */
2584 : static void
2585 7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
2586 : ChangeContext *chgcxt)
2587 : {
2588 : /* Put the tuple in the table, but make sure it won't be decoded */
2589 7 : table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2590 : TABLE_INSERT_NO_LOGICAL, NULL);
2591 :
2592 : /* Update indexes with this new tuple. */
2593 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2594 : chgcxt->cc_estate,
2595 : 0,
2596 : slot,
2597 : NIL, NULL);
2598 7 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
2599 7 : }
2600 :
2601 : /*
2602 : * Apply an update from the spill of concurrent changes to the new copy of the
2603 : * table.
2604 : */
2605 : static void
2606 10 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
2607 : TupleTableSlot *ondisk_tuple,
2608 : ChangeContext *chgcxt)
2609 : {
2610 : LockTupleMode lockmode;
2611 : TM_FailureData tmfd;
2612 : TU_UpdateIndexes update_indexes;
2613 : TM_Result res;
2614 :
2615 : /*
2616 : * Carry out the update, skipping logical decoding for it.
2617 : */
2618 10 : res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2619 : GetCurrentCommandId(true),
2620 : TABLE_UPDATE_NO_LOGICAL,
2621 : InvalidSnapshot,
2622 : InvalidSnapshot,
2623 : false,
2624 : &tmfd, &lockmode, &update_indexes);
2625 10 : if (res != TM_Ok)
2626 0 : ereport(ERROR,
2627 : errmsg("failed to apply concurrent UPDATE"));
2628 :
2629 10 : if (update_indexes != TU_None)
2630 : {
2631 7 : uint32 flags = EIIT_IS_UPDATE;
2632 :
2633 7 : if (update_indexes == TU_Summarizing)
2634 0 : flags |= EIIT_ONLY_SUMMARIZING;
2635 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2636 : chgcxt->cc_estate,
2637 : flags,
2638 : spilled_tuple,
2639 : NIL, NULL);
2640 : }
2641 :
2642 10 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
2643 10 : }
2644 :
2645 : static void
2646 3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
2647 : {
2648 : TM_Result res;
2649 : TM_FailureData tmfd;
2650 :
2651 : /*
2652 : * Delete tuple from the new heap, skipping logical decoding for it.
2653 : */
2654 3 : res = table_tuple_delete(rel, &(slot->tts_tid),
2655 : GetCurrentCommandId(true),
2656 : TABLE_DELETE_NO_LOGICAL,
2657 : InvalidSnapshot, InvalidSnapshot,
2658 : false,
2659 : &tmfd);
2660 :
2661 3 : if (res != TM_Ok)
2662 0 : ereport(ERROR,
2663 : errmsg("failed to apply concurrent DELETE"));
2664 :
2665 3 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
2666 3 : }
2667 :
2668 : /*
2669 : * Read tuple from file and put it in the input slot. All memory is allocated
2670 : * in the current memory context; caller is responsible for freeing it as
2671 : * appropriate.
2672 : *
2673 : * External attributes are stored in separate memory chunks, in order to avoid
2674 : * exceeding MaxAllocSize - that could happen if the individual attributes are
2675 : * smaller than MaxAllocSize but the whole tuple is bigger.
2676 : */
2677 : static void
2678 27 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
2679 : {
2680 : uint32 t_len;
2681 : HeapTuple tup;
2682 : int natt_ext;
2683 :
2684 : /* Read the tuple. */
2685 27 : BufFileReadExact(file, &t_len, sizeof(t_len));
2686 27 : tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2687 27 : tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2688 27 : BufFileReadExact(file, tup->t_data, t_len);
2689 27 : tup->t_len = t_len;
2690 27 : ItemPointerSetInvalid(&tup->t_self);
2691 27 : tup->t_tableOid = RelationGetRelid(relation);
2692 :
2693 : /*
2694 : * Put the tuple we read in a slot. This deforms it, so that we can hack
2695 : * the external attributes in place.
2696 : */
2697 27 : ExecForceStoreHeapTuple(tup, slot, false);
2698 :
2699 : /*
2700 : * Next, read any attributes we stored separately into the tts_values
2701 : * array elements expecting them, if any. This matches
2702 : * repack_store_change.
2703 : */
2704 27 : BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2705 27 : if (natt_ext > 0)
2706 : {
2707 2 : TupleDesc desc = slot->tts_tupleDescriptor;
2708 :
2709 6 : for (int i = 0; i < desc->natts; i++)
2710 : {
2711 4 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2712 : varlena *varlen;
2713 : union
2714 : {
2715 : alignas(int32) varlena hdr;
2716 : char data[sizeof(void *)];
2717 : } chunk_header;
2718 : void *value;
2719 : Size varlensz;
2720 :
2721 4 : if (attr->attisdropped || attr->attlen != -1)
2722 2 : continue;
2723 2 : if (slot_attisnull(slot, i + 1))
2724 0 : continue;
2725 2 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
2726 2 : if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
2727 0 : continue;
2728 2 : slot_getsomeattrs(slot, i + 1);
2729 :
2730 2 : BufFileReadExact(file, &chunk_header, VARHDRSZ);
2731 2 : varlensz = VARSIZE_ANY(&chunk_header);
2732 :
2733 2 : value = palloc(varlensz);
2734 2 : SET_VARSIZE(value, VARSIZE_ANY(&chunk_header));
2735 2 : BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2736 :
2737 2 : slot->tts_values[i] = PointerGetDatum(value);
2738 2 : natt_ext--;
2739 2 : if (natt_ext < 0)
2740 0 : ereport(ERROR,
2741 : errcode(ERRCODE_DATA_CORRUPTED),
2742 : errmsg("insufficient number of attributes stored separately"));
2743 : }
2744 : }
2745 27 : }
2746 :
2747 : /*
2748 : * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2749 : * corresponding ones from 'src'.
2750 : */
2751 : static void
2752 10 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
2753 : {
2754 10 : TupleDesc desc = dest->tts_tupleDescriptor;
2755 :
2756 30 : for (int i = 0; i < desc->natts; i++)
2757 : {
2758 20 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2759 : varlena *varlena_dst;
2760 :
2761 20 : if (attr->attisdropped)
2762 0 : continue;
2763 20 : if (attr->attlen != -1)
2764 18 : continue;
2765 2 : if (slot_attisnull(dest, i + 1))
2766 0 : continue;
2767 :
2768 2 : slot_getsomeattrs(dest, i + 1);
2769 :
2770 2 : varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2771 2 : if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
2772 1 : continue;
2773 1 : slot_getsomeattrs(src, i + 1);
2774 :
2775 1 : dest->tts_values[i] = src->tts_values[i];
2776 : }
2777 10 : }
2778 :
2779 : /*
2780 : * Find the tuple to be updated or deleted by the given data change, whose
2781 : * tuple has already been loaded into locator.
2782 : *
2783 : * If the tuple is found, put it in retrieved and return true. If the tuple is
2784 : * not found, return false.
2785 : */
2786 : static bool
2787 13 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
2788 : TupleTableSlot *retrieved)
2789 : {
2790 13 : Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2791 : IndexScanDesc scan;
2792 : bool retval;
2793 :
2794 : /*
2795 : * Scan key is passed by caller, so it does not have to be constructed
2796 : * multiple times. Key entries have all fields initialized, except for
2797 : * sk_argument.
2798 : *
2799 : * Use the incoming tuple to finalize the scan key.
2800 : */
2801 26 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2802 : {
2803 13 : ScanKey entry = &chgcxt->cc_ident_key[i];
2804 13 : AttrNumber attno = idx->indkey.values[i];
2805 :
2806 13 : entry->sk_argument = locator->tts_values[attno - 1];
2807 : Assert(!locator->tts_isnull[attno - 1]);
2808 : }
2809 :
2810 : /* XXX no instrumentation for now */
2811 13 : scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2812 : NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2813 13 : index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2814 13 : retval = index_getnext_slot(scan, ForwardScanDirection, retrieved);
2815 13 : index_endscan(scan);
2816 :
2817 13 : return retval;
2818 : }
2819 :
2820 : /*
2821 : * Decode and apply concurrent changes, up to (and including) the record whose
2822 : * LSN is 'end_of_wal'.
2823 : *
2824 : * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2825 : * are far too similar to each other.
2826 : */
2827 : static void
2828 4 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
2829 : {
2830 : DecodingWorkerShared *shared;
2831 : char fname[MAXPGPATH];
2832 : BufFile *file;
2833 :
2834 4 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2835 : PROGRESS_REPACK_PHASE_CATCH_UP);
2836 :
2837 : /* Ask the worker for the file. */
2838 4 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
2839 4 : SpinLockAcquire(&shared->mutex);
2840 4 : shared->lsn_upto = end_of_wal;
2841 4 : shared->done = done;
2842 4 : SpinLockRelease(&shared->mutex);
2843 :
2844 : /*
2845 : * The worker needs to finish processing of the current WAL record. Even
2846 : * if it's idle, it'll need to close the output file. Thus we're likely to
2847 : * wait, so prepare for sleep.
2848 : */
2849 4 : ConditionVariablePrepareToSleep(&shared->cv);
2850 : for (;;)
2851 4 : {
2852 : int last_exported;
2853 :
2854 8 : SpinLockAcquire(&shared->mutex);
2855 8 : last_exported = shared->last_exported;
2856 8 : SpinLockRelease(&shared->mutex);
2857 :
2858 : /*
2859 : * Has the worker exported the file we are waiting for?
2860 : */
2861 8 : if (last_exported == chgcxt->cc_file_seq)
2862 4 : break;
2863 :
2864 4 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
2865 : }
2866 4 : ConditionVariableCancelSleep();
2867 :
2868 : /* Open the file. */
2869 4 : DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
2870 4 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
2871 4 : apply_concurrent_changes(file, chgcxt);
2872 :
2873 4 : BufFileClose(file);
2874 :
2875 : /* Get ready for the next file. */
2876 4 : chgcxt->cc_file_seq++;
2877 4 : }
2878 :
2879 : /*
2880 : * Initialize the ChangeContext struct for the given relation, with
2881 : * the given index as identity index.
2882 : */
2883 : static void
2884 2 : initialize_change_context(ChangeContext *chgcxt,
2885 : Relation relation, Oid ident_index_id)
2886 : {
2887 2 : chgcxt->cc_rel = relation;
2888 :
2889 : /* Only initialize fields needed by ExecInsertIndexTuples(). */
2890 2 : chgcxt->cc_estate = CreateExecutorState();
2891 :
2892 2 : chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
2893 2 : InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
2894 2 : ExecOpenIndices(chgcxt->cc_rri, false);
2895 :
2896 : /*
2897 : * The table's relcache entry already has the relcache entry for the
2898 : * identity index; find that.
2899 : */
2900 2 : chgcxt->cc_ident_index = NULL;
2901 2 : for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
2902 : {
2903 : Relation ind_rel;
2904 :
2905 2 : ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
2906 2 : if (ind_rel->rd_id == ident_index_id)
2907 : {
2908 2 : chgcxt->cc_ident_index = ind_rel;
2909 2 : break;
2910 : }
2911 : }
2912 2 : if (chgcxt->cc_ident_index == NULL)
2913 0 : elog(ERROR, "failed to find identity index");
2914 :
2915 : /* Set up for scanning said identity index */
2916 : {
2917 : Form_pg_index indexForm;
2918 :
2919 2 : indexForm = chgcxt->cc_ident_index->rd_index;
2920 2 : chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
2921 2 : chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
2922 4 : for (int i = 0; i < indexForm->indnkeyatts; i++)
2923 : {
2924 : ScanKey entry;
2925 : Oid opfamily,
2926 : opcintype,
2927 : opno,
2928 : opcode;
2929 :
2930 2 : entry = &chgcxt->cc_ident_key[i];
2931 :
2932 2 : opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
2933 2 : opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
2934 2 : opno = get_opfamily_member(opfamily, opcintype, opcintype,
2935 : BTEqualStrategyNumber);
2936 2 : if (!OidIsValid(opno))
2937 0 : elog(ERROR, "failed to find = operator for type %u", opcintype);
2938 2 : opcode = get_opcode(opno);
2939 2 : if (!OidIsValid(opcode))
2940 0 : elog(ERROR, "failed to find = operator for operator %u", opno);
2941 :
2942 : /* Initialize everything but argument. */
2943 2 : ScanKeyInit(entry,
2944 2 : i + 1,
2945 : BTEqualStrategyNumber, opcode,
2946 : (Datum) 0);
2947 2 : entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
2948 : }
2949 : }
2950 :
2951 2 : chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
2952 2 : }
2953 :
2954 : /*
2955 : * Free up resources taken by a ChangeContext.
2956 : */
2957 : static void
2958 2 : release_change_context(ChangeContext *chgcxt)
2959 : {
2960 2 : ExecCloseIndices(chgcxt->cc_rri);
2961 2 : FreeExecutorState(chgcxt->cc_estate);
2962 : /* XXX are these pfrees necessary? */
2963 2 : pfree(chgcxt->cc_rri);
2964 2 : pfree(chgcxt->cc_ident_key);
2965 2 : }
2966 :
2967 : /*
2968 : * The final steps of rebuild_relation() for concurrent processing.
2969 : *
2970 : * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
2971 : * clustering index (if one is passed) are still locked in a mode that allows
2972 : * concurrent data changes. On exit, both tables and their indexes are closed,
2973 : * but locked in AccessExclusiveLock mode.
2974 : */
2975 : static void
2976 2 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
2977 : Oid identIdx, TransactionId frozenXid,
2978 : MultiXactId cutoffMulti)
2979 : {
2980 : List *ind_oids_new;
2981 2 : Oid old_table_oid = RelationGetRelid(OldHeap);
2982 2 : Oid new_table_oid = RelationGetRelid(NewHeap);
2983 2 : List *ind_oids_old = RelationGetIndexList(OldHeap);
2984 : ListCell *lc,
2985 : *lc2;
2986 : char relpersistence;
2987 : bool is_system_catalog;
2988 : Oid ident_idx_new;
2989 : XLogRecPtr end_of_wal;
2990 : List *indexrels;
2991 : ChangeContext chgcxt;
2992 :
2993 : Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
2994 : Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
2995 :
2996 : /*
2997 : * Unlike the exclusive case, we build new indexes for the new relation
2998 : * rather than swapping the storage and reindexing the old relation. The
2999 : * point is that the index build can take some time, so we do it before we
3000 : * get AccessExclusiveLock on the old heap and therefore we cannot swap
3001 : * the heap storage yet.
3002 : *
3003 : * index_create() will lock the new indexes using AccessExclusiveLock - no
3004 : * need to change that. At the same time, we use ShareUpdateExclusiveLock
3005 : * to lock the existing indexes - that should be enough to prevent others
3006 : * from changing them while we're repacking the relation. The lock on
3007 : * table should prevent others from changing the index column list, but
3008 : * might not be enough for commands like ALTER INDEX ... SET ... (Those
3009 : * are not necessarily dangerous, but can make user confused if the
3010 : * changes they do get lost due to REPACK.)
3011 : */
3012 2 : ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
3013 :
3014 : /*
3015 : * The identity index in the new relation appears in the same relative
3016 : * position as the corresponding index in the old relation. Find it.
3017 : */
3018 2 : ident_idx_new = InvalidOid;
3019 4 : foreach_oid(ind_old, ind_oids_old)
3020 : {
3021 2 : if (identIdx == ind_old)
3022 : {
3023 2 : int pos = foreach_current_index(ind_old);
3024 :
3025 2 : if (unlikely(list_length(ind_oids_new) < pos))
3026 0 : elog(ERROR, "list of new indexes too short");
3027 2 : ident_idx_new = list_nth_oid(ind_oids_new, pos);
3028 2 : break;
3029 : }
3030 : }
3031 2 : if (!OidIsValid(ident_idx_new))
3032 0 : elog(ERROR, "could not find index matching \"%s\" at the new relation",
3033 : get_rel_name(identIdx));
3034 :
3035 : /* Gather information to apply concurrent changes. */
3036 2 : initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
3037 :
3038 : /*
3039 : * During testing, wait for another backend to perform concurrent data
3040 : * changes which we will process below.
3041 : */
3042 2 : INJECTION_POINT("repack-concurrently-before-lock", NULL);
3043 :
3044 : /*
3045 : * Flush all WAL records inserted so far (possibly except for the last
3046 : * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3047 : * we need to flush while holding exclusive lock on the source table.
3048 : */
3049 2 : XLogFlush(GetXLogInsertEndRecPtr());
3050 2 : end_of_wal = GetFlushRecPtr(NULL);
3051 :
3052 : /*
3053 : * Apply concurrent changes first time, to minimize the time we need to
3054 : * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3055 : * written during the data copying and index creation.)
3056 : */
3057 2 : process_concurrent_changes(end_of_wal, &chgcxt, false);
3058 :
3059 : /*
3060 : * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3061 : * is one), all its indexes, so that we can swap the files.
3062 : */
3063 2 : LockRelationOid(old_table_oid, AccessExclusiveLock);
3064 :
3065 : /*
3066 : * Lock all indexes now, not only the clustering one: all indexes need to
3067 : * have their files swapped. While doing that, store their relation
3068 : * references in a zero-terminated array, to handle predicate locks below.
3069 : */
3070 2 : indexrels = NIL;
3071 6 : foreach_oid(ind_oid, ind_oids_old)
3072 : {
3073 : Relation index;
3074 :
3075 2 : index = index_open(ind_oid, AccessExclusiveLock);
3076 :
3077 : /*
3078 : * Some things about the index may have changed before we locked the
3079 : * index, such as ALTER INDEX RENAME. We don't need to do anything
3080 : * here to absorb those changes in the new index.
3081 : */
3082 2 : indexrels = lappend(indexrels, index);
3083 : }
3084 :
3085 : /*
3086 : * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3087 : * needed to swap the files.
3088 : */
3089 2 : if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3090 1 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3091 :
3092 : /*
3093 : * Tuples and pages of the old heap will be gone, but the heap will stay.
3094 : */
3095 2 : TransferPredicateLocksToHeapRelation(OldHeap);
3096 6 : foreach_ptr(RelationData, index, indexrels)
3097 : {
3098 2 : TransferPredicateLocksToHeapRelation(index);
3099 2 : index_close(index, NoLock);
3100 : }
3101 2 : list_free(indexrels);
3102 :
3103 : /*
3104 : * Flush WAL again, to make sure that all changes committed while we were
3105 : * waiting for the exclusive lock are available for decoding.
3106 : */
3107 2 : XLogFlush(GetXLogInsertEndRecPtr());
3108 2 : end_of_wal = GetFlushRecPtr(NULL);
3109 :
3110 : /*
3111 : * Apply the concurrent changes again. Indicate that the decoding worker
3112 : * won't be needed anymore.
3113 : */
3114 2 : process_concurrent_changes(end_of_wal, &chgcxt, true);
3115 :
3116 : /* Remember info about rel before closing OldHeap */
3117 2 : relpersistence = OldHeap->rd_rel->relpersistence;
3118 2 : is_system_catalog = IsSystemRelation(OldHeap);
3119 :
3120 2 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3121 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
3122 :
3123 : /*
3124 : * Even ShareUpdateExclusiveLock should have prevented others from
3125 : * creating / dropping indexes (even using the CONCURRENTLY option), so we
3126 : * do not need to check whether the lists match.
3127 : */
3128 4 : forboth(lc, ind_oids_old, lc2, ind_oids_new)
3129 : {
3130 2 : Oid ind_old = lfirst_oid(lc);
3131 2 : Oid ind_new = lfirst_oid(lc2);
3132 2 : Oid mapped_tables[4] = {0};
3133 :
3134 2 : swap_relation_files(ind_old, ind_new,
3135 : (old_table_oid == RelationRelationId),
3136 : false, /* swap_toast_by_content */
3137 : true,
3138 : InvalidTransactionId,
3139 : InvalidMultiXactId,
3140 : mapped_tables);
3141 :
3142 : #ifdef USE_ASSERT_CHECKING
3143 :
3144 : /*
3145 : * Concurrent processing is not supported for system relations, so
3146 : * there should be no mapped tables.
3147 : */
3148 : for (int i = 0; i < 4; i++)
3149 : Assert(!OidIsValid(mapped_tables[i]));
3150 : #endif
3151 : }
3152 :
3153 : /* The new indexes must be visible for deletion. */
3154 2 : CommandCounterIncrement();
3155 :
3156 : /* Close the old heap but keep lock until transaction commit. */
3157 2 : table_close(OldHeap, NoLock);
3158 : /* Close the new heap. (We didn't have to open its indexes). */
3159 2 : table_close(NewHeap, NoLock);
3160 :
3161 : /* Cleanup what we don't need anymore. (And close the identity index.) */
3162 2 : release_change_context(&chgcxt);
3163 :
3164 : /*
3165 : * Swap the relations and their TOAST relations and TOAST indexes. This
3166 : * also drops the new relation and its indexes.
3167 : *
3168 : * (System catalogs are currently not supported.)
3169 : */
3170 : Assert(!is_system_catalog);
3171 2 : finish_heap_swap(old_table_oid, new_table_oid,
3172 : is_system_catalog,
3173 : false, /* swap_toast_by_content */
3174 : false,
3175 : true,
3176 : false, /* reindex */
3177 : frozenXid, cutoffMulti,
3178 : relpersistence);
3179 2 : }
3180 :
3181 : /*
3182 : * Build indexes on NewHeap according to those on OldHeap.
3183 : *
3184 : * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3185 : * up locked using ShareUpdateExclusiveLock.
3186 : *
3187 : * A list of OIDs of the corresponding indexes created on NewHeap is
3188 : * returned. The order of items does match, so we can use these arrays to swap
3189 : * index storage.
3190 : */
3191 : static List *
3192 2 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
3193 : {
3194 2 : List *result = NIL;
3195 :
3196 2 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3197 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
3198 :
3199 6 : foreach_oid(oldindex, OldIndexes)
3200 : {
3201 : Oid newindex;
3202 : char *newName;
3203 : Relation ind;
3204 :
3205 2 : ind = index_open(oldindex, ShareUpdateExclusiveLock);
3206 :
3207 2 : newName = ChooseRelationName(get_rel_name(oldindex),
3208 : NULL,
3209 : "repacknew",
3210 2 : get_rel_namespace(ind->rd_index->indrelid),
3211 : false);
3212 2 : newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
3213 2 : oldindex, ind->rd_rel->reltablespace,
3214 : newName);
3215 2 : copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
3216 2 : result = lappend_oid(result, newindex);
3217 :
3218 2 : index_close(ind, NoLock);
3219 : }
3220 :
3221 2 : return result;
3222 : }
3223 :
3224 : /*
3225 : * Create a transient copy of a constraint -- supported by a transient
3226 : * copy of the index that supports the original constraint.
3227 : *
3228 : * When repacking a table that contains exclusion constraints, the executor
3229 : * relies on these constraints being properly catalogued. These copies are
3230 : * to support that.
3231 : *
3232 : * We don't need the constraints for anything else (the original constraints
3233 : * will be there once repack completes), so we add pg_depend entries so that
3234 : * the are dropped when the transient table is dropped.
3235 : */
3236 : static void
3237 2 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
3238 : {
3239 : ScanKeyData skey;
3240 : Relation rel;
3241 : TupleDesc desc;
3242 : SysScanDesc scan;
3243 : HeapTuple tup;
3244 : ObjectAddress objrel;
3245 :
3246 2 : rel = table_open(ConstraintRelationId, RowExclusiveLock);
3247 2 : ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
3248 :
3249 : /*
3250 : * Retrieve the constraints supported by the old index and create an
3251 : * identical one that points to the new index.
3252 : */
3253 2 : ScanKeyInit(&skey,
3254 : Anum_pg_constraint_conrelid,
3255 : BTEqualStrategyNumber, F_OIDEQ,
3256 2 : ObjectIdGetDatum(old_index->rd_index->indrelid));
3257 2 : scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
3258 : NULL, 1, &skey);
3259 2 : desc = RelationGetDescr(rel);
3260 6 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3261 : {
3262 4 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
3263 : Oid oid;
3264 4 : Datum values[Natts_pg_constraint] = {0};
3265 4 : bool nulls[Natts_pg_constraint] = {0};
3266 4 : bool replaces[Natts_pg_constraint] = {0};
3267 : HeapTuple new_tup;
3268 : ObjectAddress objcon;
3269 :
3270 4 : if (conform->conindid != RelationGetRelid(old_index))
3271 2 : continue;
3272 :
3273 2 : oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
3274 : Anum_pg_constraint_oid);
3275 2 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
3276 2 : replaces[Anum_pg_constraint_oid - 1] = true;
3277 2 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
3278 2 : replaces[Anum_pg_constraint_conrelid - 1] = true;
3279 2 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
3280 2 : replaces[Anum_pg_constraint_conindid - 1] = true;
3281 :
3282 2 : new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3283 :
3284 : /* Insert it into the catalog. */
3285 2 : CatalogTupleInsert(rel, new_tup);
3286 :
3287 : /* Create a dependency so it's removed when we drop the new heap. */
3288 2 : ObjectAddressSet(objcon, ConstraintRelationId, oid);
3289 2 : recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
3290 : }
3291 2 : systable_endscan(scan);
3292 :
3293 2 : table_close(rel, RowExclusiveLock);
3294 :
3295 2 : CommandCounterIncrement();
3296 2 : }
3297 :
3298 : /*
3299 : * Try to start a background worker to perform logical decoding of data
3300 : * changes applied to relation while REPACK CONCURRENTLY is copying its
3301 : * contents to a new table.
3302 : */
3303 : static void
3304 2 : start_repack_decoding_worker(Oid relid)
3305 : {
3306 : Size size;
3307 : dsm_segment *seg;
3308 : DecodingWorkerShared *shared;
3309 : shm_mq *mq;
3310 : shm_mq_handle *mqh;
3311 : BackgroundWorker bgw;
3312 :
3313 : /* Setup shared memory. */
3314 2 : size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3315 : BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
3316 2 : seg = dsm_create(size, 0);
3317 2 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
3318 2 : shared->lsn_upto = InvalidXLogRecPtr;
3319 2 : shared->done = false;
3320 2 : SharedFileSetInit(&shared->sfs, seg);
3321 2 : shared->last_exported = -1;
3322 2 : SpinLockInit(&shared->mutex);
3323 2 : shared->dbid = MyDatabaseId;
3324 :
3325 : /*
3326 : * This is the UserId set in cluster_rel(). Security context shouldn't be
3327 : * needed for decoding worker.
3328 : */
3329 2 : shared->roleid = GetUserId();
3330 2 : shared->relid = relid;
3331 2 : ConditionVariableInit(&shared->cv);
3332 2 : shared->backend_proc = MyProc;
3333 2 : shared->backend_pid = MyProcPid;
3334 2 : shared->backend_proc_number = MyProcNumber;
3335 :
3336 2 : mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3337 : REPACK_ERROR_QUEUE_SIZE);
3338 2 : shm_mq_set_receiver(mq, MyProc);
3339 2 : mqh = shm_mq_attach(mq, seg, NULL);
3340 :
3341 2 : memset(&bgw, 0, sizeof(bgw));
3342 2 : snprintf(bgw.bgw_name, BGW_MAXLEN,
3343 : "REPACK decoding worker for relation \"%s\"",
3344 : get_rel_name(relid));
3345 2 : snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3346 2 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3347 : BGWORKER_BACKEND_DATABASE_CONNECTION;
3348 2 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3349 2 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
3350 2 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3351 2 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3352 2 : bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
3353 2 : bgw.bgw_notify_pid = MyProcPid;
3354 :
3355 2 : decoding_worker = palloc0_object(DecodingWorker);
3356 2 : if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
3357 0 : ereport(ERROR,
3358 : errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
3359 : errmsg("out of background worker slots"),
3360 : errhint("You might need to increase \"%s\".", "max_worker_processes"));
3361 :
3362 2 : decoding_worker->seg = seg;
3363 2 : decoding_worker->error_mqh = mqh;
3364 :
3365 : /*
3366 : * The decoding setup must be done before the caller can have XID assigned
3367 : * for any reason, otherwise the worker might end up in a deadlock,
3368 : * waiting for the caller's transaction to end. Therefore wait here until
3369 : * the worker indicates that it has the logical decoding initialized.
3370 : */
3371 2 : ConditionVariablePrepareToSleep(&shared->cv);
3372 : for (;;)
3373 4 : {
3374 : bool initialized;
3375 :
3376 6 : SpinLockAcquire(&shared->mutex);
3377 6 : initialized = shared->initialized;
3378 6 : SpinLockRelease(&shared->mutex);
3379 :
3380 6 : if (initialized)
3381 2 : break;
3382 :
3383 4 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3384 : }
3385 2 : ConditionVariableCancelSleep();
3386 2 : }
3387 :
3388 : /*
3389 : * Stop the decoding worker and cleanup the related resources.
3390 : *
3391 : * The worker stops on its own when it knows there is no more work to do, but
3392 : * we need to stop it explicitly at least on ERROR in the launching backend.
3393 : */
3394 : static void
3395 2 : stop_repack_decoding_worker(void)
3396 : {
3397 : BgwHandleStatus status;
3398 :
3399 : /* Haven't reached the worker startup? */
3400 2 : if (decoding_worker == NULL)
3401 0 : return;
3402 :
3403 : /* Could not register the worker? */
3404 2 : if (decoding_worker->handle == NULL)
3405 0 : return;
3406 :
3407 2 : TerminateBackgroundWorker(decoding_worker->handle);
3408 : /* The worker should really exit before the REPACK command does. */
3409 2 : HOLD_INTERRUPTS();
3410 2 : status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
3411 2 : RESUME_INTERRUPTS();
3412 :
3413 2 : if (status == BGWH_POSTMASTER_DIED)
3414 0 : ereport(FATAL,
3415 : errcode(ERRCODE_ADMIN_SHUTDOWN),
3416 : errmsg("postmaster exited during REPACK command"));
3417 :
3418 2 : shm_mq_detach(decoding_worker->error_mqh);
3419 :
3420 : /*
3421 : * If we could not cancel the current sleep due to ERROR, do that before
3422 : * we detach from the shared memory the condition variable is located in.
3423 : * If we did not, the bgworker ERROR handling code would try and fail
3424 : * badly.
3425 : */
3426 2 : ConditionVariableCancelSleep();
3427 :
3428 2 : dsm_detach(decoding_worker->seg);
3429 2 : pfree(decoding_worker);
3430 2 : decoding_worker = NULL;
3431 : }
3432 :
3433 : /*
3434 : * Get the initial snapshot from the decoding worker.
3435 : */
3436 : static Snapshot
3437 2 : get_initial_snapshot(DecodingWorker *worker)
3438 : {
3439 : DecodingWorkerShared *shared;
3440 : char fname[MAXPGPATH];
3441 : BufFile *file;
3442 : Size snap_size;
3443 : char *snap_space;
3444 : Snapshot snapshot;
3445 :
3446 2 : shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3447 :
3448 : /*
3449 : * The worker needs to initialize the logical decoding, which usually
3450 : * takes some time. Therefore it makes sense to prepare for the sleep
3451 : * first.
3452 : */
3453 2 : ConditionVariablePrepareToSleep(&shared->cv);
3454 : for (;;)
3455 1 : {
3456 : int last_exported;
3457 :
3458 3 : SpinLockAcquire(&shared->mutex);
3459 3 : last_exported = shared->last_exported;
3460 3 : SpinLockRelease(&shared->mutex);
3461 :
3462 : /*
3463 : * Has the worker exported the file we are waiting for?
3464 : */
3465 3 : if (last_exported == WORKER_FILE_SNAPSHOT)
3466 2 : break;
3467 :
3468 1 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3469 : }
3470 2 : ConditionVariableCancelSleep();
3471 :
3472 : /* Read the snapshot from a file. */
3473 2 : DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
3474 2 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3475 2 : BufFileReadExact(file, &snap_size, sizeof(snap_size));
3476 2 : snap_space = (char *) palloc(snap_size);
3477 2 : BufFileReadExact(file, snap_space, snap_size);
3478 2 : BufFileClose(file);
3479 :
3480 : /* Restore it. */
3481 2 : snapshot = RestoreSnapshot(snap_space);
3482 2 : pfree(snap_space);
3483 :
3484 2 : return snapshot;
3485 : }
3486 :
3487 : /*
3488 : * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3489 : * If relations of the same 'relid' happen to be processed at the same time,
3490 : * they must be from different databases and therefore different backends must
3491 : * be involved.
3492 : */
3493 : void
3494 12 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
3495 : {
3496 : /* The PID is already present in the fileset name, so we needn't add it */
3497 12 : snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3498 12 : }
3499 :
3500 : /*
3501 : * Handle receipt of an interrupt indicating a repack worker message.
3502 : *
3503 : * Note: this is called within a signal handler! All we can do is set
3504 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3505 : * ProcessRepackMessages().
3506 : */
3507 : void
3508 2 : HandleRepackMessageInterrupt(void)
3509 : {
3510 2 : InterruptPending = true;
3511 2 : RepackMessagePending = true;
3512 2 : SetLatch(MyLatch);
3513 2 : }
3514 :
3515 : /*
3516 : * Process any queued protocol messages received from the repack worker.
3517 : */
3518 : void
3519 2 : ProcessRepackMessages(void)
3520 : {
3521 : MemoryContext oldcontext;
3522 : static MemoryContext hpm_context = NULL;
3523 :
3524 : /*
3525 : * Nothing to do if we haven't launched the worker yet or have already
3526 : * terminated it.
3527 : */
3528 2 : if (decoding_worker == NULL)
3529 0 : return;
3530 :
3531 : /*
3532 : * This is invoked from ProcessInterrupts(), and since some of the
3533 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3534 : * for recursive calls if more signals are received while this runs. It's
3535 : * unclear that recursive entry would be safe, and it doesn't seem useful
3536 : * even if it is safe, so let's block interrupts until done.
3537 : */
3538 2 : HOLD_INTERRUPTS();
3539 :
3540 : /*
3541 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3542 : * don't want to risk leaking data into long-lived contexts, so let's do
3543 : * our work here in a private context that we can reset on each use.
3544 : */
3545 2 : if (hpm_context == NULL) /* first time through? */
3546 2 : hpm_context = AllocSetContextCreate(TopMemoryContext,
3547 : "ProcessRepackMessages",
3548 : ALLOCSET_DEFAULT_SIZES);
3549 : else
3550 0 : MemoryContextReset(hpm_context);
3551 :
3552 2 : oldcontext = MemoryContextSwitchTo(hpm_context);
3553 :
3554 : /* OK to process messages. Reset the flag saying there are more to do. */
3555 2 : RepackMessagePending = false;
3556 :
3557 : /*
3558 : * Read as many messages as we can from the worker, but stop when no more
3559 : * messages can be read from the worker without blocking.
3560 : */
3561 : while (true)
3562 0 : {
3563 : shm_mq_result res;
3564 : Size nbytes;
3565 : void *data;
3566 :
3567 2 : res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
3568 : &data, true);
3569 2 : if (res == SHM_MQ_WOULD_BLOCK)
3570 0 : break;
3571 2 : else if (res == SHM_MQ_SUCCESS)
3572 : {
3573 : StringInfoData msg;
3574 :
3575 0 : initStringInfo(&msg);
3576 0 : appendBinaryStringInfo(&msg, data, nbytes);
3577 0 : ProcessRepackMessage(&msg);
3578 0 : pfree(msg.data);
3579 : }
3580 : else
3581 : {
3582 : /*
3583 : * The decoding worker is special in that it exits as soon as it
3584 : * has its work done. Thus the DETACHED result code is fine.
3585 : */
3586 : Assert(res == SHM_MQ_DETACHED);
3587 :
3588 2 : break;
3589 : }
3590 : }
3591 :
3592 2 : MemoryContextSwitchTo(oldcontext);
3593 :
3594 : /* Might as well clear the context on our way out */
3595 2 : MemoryContextReset(hpm_context);
3596 :
3597 2 : RESUME_INTERRUPTS();
3598 : }
3599 :
3600 : /*
3601 : * Process a single protocol message received from a single parallel worker.
3602 : */
3603 : static void
3604 0 : ProcessRepackMessage(StringInfo msg)
3605 : {
3606 : char msgtype;
3607 :
3608 0 : msgtype = pq_getmsgbyte(msg);
3609 :
3610 0 : switch (msgtype)
3611 : {
3612 0 : case PqMsg_ErrorResponse:
3613 : case PqMsg_NoticeResponse:
3614 : {
3615 : ErrorData edata;
3616 :
3617 : /* Parse ErrorResponse or NoticeResponse. */
3618 0 : pq_parse_errornotice(msg, &edata);
3619 :
3620 : /* Death of a worker isn't enough justification for suicide. */
3621 0 : edata.elevel = Min(edata.elevel, ERROR);
3622 :
3623 : /*
3624 : * Add a context line to show that this is a message
3625 : * propagated from the worker. Otherwise, it can sometimes be
3626 : * confusing to understand what actually happened.
3627 : */
3628 0 : if (edata.context)
3629 0 : edata.context = psprintf("%s\n%s", edata.context,
3630 : _("REPACK decoding worker"));
3631 : else
3632 0 : edata.context = pstrdup(_("REPACK decoding worker"));
3633 :
3634 : /* Rethrow error or print notice. */
3635 0 : ThrowErrorData(&edata);
3636 :
3637 0 : break;
3638 : }
3639 :
3640 0 : default:
3641 : {
3642 0 : elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3643 : msgtype, msg->len);
3644 : }
3645 : }
3646 0 : }
|