Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack.c
4 : * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 : * parts of this code.
6 : *
7 : * There are two somewhat different ways to rewrite a table. In non-
8 : * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 : * transient relation, copy the tuples over to the relfilenode of the new
10 : * relation, swap the relfilenodes, then drop the old relation.
11 : *
12 : * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 : * then do an initial copy as above. However, while the tuples are being
14 : * copied, concurrent transactions could modify the table. To cope with those
15 : * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 : * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 : * from being reserved), and accumulates the changes in a file. Once the
18 : * initial copy is complete, we read the changes from the file and re-apply
19 : * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 : * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 : * a strong lock on the table is much reduced, and the bloat is eliminated.
22 : *
23 : *
24 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 : * Portions Copyright (c) 1994-5, Regents of the University of California
26 : *
27 : *
28 : * IDENTIFICATION
29 : * src/backend/commands/repack.c
30 : *
31 : *-------------------------------------------------------------------------
32 : */
33 : #include "postgres.h"
34 :
35 : #include "access/amapi.h"
36 : #include "access/heapam.h"
37 : #include "access/multixact.h"
38 : #include "access/relscan.h"
39 : #include "access/tableam.h"
40 : #include "access/toast_internals.h"
41 : #include "access/transam.h"
42 : #include "access/xact.h"
43 : #include "catalog/catalog.h"
44 : #include "catalog/dependency.h"
45 : #include "catalog/heap.h"
46 : #include "catalog/index.h"
47 : #include "catalog/namespace.h"
48 : #include "catalog/objectaccess.h"
49 : #include "catalog/pg_am.h"
50 : #include "catalog/pg_constraint.h"
51 : #include "catalog/pg_inherits.h"
52 : #include "catalog/toasting.h"
53 : #include "commands/defrem.h"
54 : #include "commands/progress.h"
55 : #include "commands/repack.h"
56 : #include "commands/repack_internal.h"
57 : #include "commands/tablecmds.h"
58 : #include "commands/vacuum.h"
59 : #include "executor/executor.h"
60 : #include "libpq/pqformat.h"
61 : #include "libpq/pqmq.h"
62 : #include "miscadmin.h"
63 : #include "optimizer/optimizer.h"
64 : #include "pgstat.h"
65 : #include "replication/logicalrelation.h"
66 : #include "storage/bufmgr.h"
67 : #include "storage/lmgr.h"
68 : #include "storage/predicate.h"
69 : #include "storage/proc.h"
70 : #include "utils/acl.h"
71 : #include "utils/fmgroids.h"
72 : #include "utils/guc.h"
73 : #include "utils/injection_point.h"
74 : #include "utils/inval.h"
75 : #include "utils/lsyscache.h"
76 : #include "utils/memutils.h"
77 : #include "utils/pg_rusage.h"
78 : #include "utils/relmapper.h"
79 : #include "utils/snapmgr.h"
80 : #include "utils/syscache.h"
81 : #include "utils/wait_event_types.h"
82 :
83 : /*
84 : * This struct is used to pass around the information on tables to be
85 : * clustered. We need this so we can make a list of them when invoked without
86 : * a specific table/index pair.
87 : */
88 : typedef struct
89 : {
90 : Oid tableOid;
91 : Oid indexOid;
92 : } RelToCluster;
93 :
94 : /*
95 : * The first file exported by the decoding worker must contain a snapshot, the
96 : * following ones contain the data changes.
97 : */
98 : #define WORKER_FILE_SNAPSHOT 0
99 :
100 : /*
101 : * Information needed to apply concurrent data changes.
102 : */
103 : typedef struct ChangeContext
104 : {
105 : /* The relation the changes are applied to. */
106 : Relation cc_rel;
107 :
108 : /* Needed to update indexes of cc_rel. */
109 : ResultRelInfo *cc_rri;
110 : EState *cc_estate;
111 :
112 : /*
113 : * Existing tuples to UPDATE and DELETE are located via this index. We
114 : * keep the scankey in partially initialized state to avoid repeated work.
115 : * sk_argument is completed on the fly.
116 : */
117 : Relation cc_ident_index;
118 : ScanKey cc_ident_key;
119 : int cc_ident_key_nentries;
120 :
121 : /* Sequential number of the file containing the changes. */
122 : int cc_file_seq;
123 : } ChangeContext;
124 :
125 : /*
126 : * Backend-local information to control the decoding worker.
127 : */
128 : typedef struct DecodingWorker
129 : {
130 : /* The worker. */
131 : BackgroundWorkerHandle *handle;
132 :
133 : /* DecodingWorkerShared is in this segment. */
134 : dsm_segment *seg;
135 :
136 : /* Handle of the error queue. */
137 : shm_mq_handle *error_mqh;
138 : } DecodingWorker;
139 :
140 : /* Pointer to currently running decoding worker. */
141 : static DecodingWorker *decoding_worker = NULL;
142 :
143 : /*
144 : * Is there a message sent by a repack worker that the backend needs to
145 : * receive?
146 : */
147 : volatile sig_atomic_t RepackMessagePending = false;
148 :
149 : static LOCKMODE RepackLockLevel(bool concurrent);
150 : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
151 : Oid indexOid, Oid userid, LOCKMODE lmode,
152 : int options);
153 : static void check_concurrent_repack_requirements(Relation rel,
154 : Oid *ident_idx_p);
155 : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
156 : Oid ident_idx);
157 : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
158 : Snapshot snapshot,
159 : bool verbose,
160 : bool *pSwapToastByContent,
161 : TransactionId *pFreezeXid,
162 : MultiXactId *pCutoffMulti);
163 : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
164 : MemoryContext permcxt);
165 : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
166 : Oid relid, bool rel_is_index,
167 : MemoryContext permcxt);
168 : static bool repack_is_permitted_for_relation(RepackCommand cmd,
169 : Oid relid, Oid userid);
170 :
171 : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
172 : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
173 : ChangeContext *chgcxt);
174 : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
175 : TupleTableSlot *ondisk_tuple,
176 : ChangeContext *chgcxt);
177 : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
178 : static void restore_tuple(BufFile *file, Relation relation,
179 : TupleTableSlot *slot);
180 : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
181 : TupleTableSlot *src);
182 : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
183 : TupleTableSlot *locator,
184 : TupleTableSlot *retrieved);
185 : static void process_concurrent_changes(XLogRecPtr end_of_wal,
186 : ChangeContext *chgcxt,
187 : bool done);
188 : static void initialize_change_context(ChangeContext *chgcxt,
189 : Relation relation,
190 : Oid ident_index_id);
191 : static void release_change_context(ChangeContext *chgcxt);
192 : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
193 : Oid identIdx,
194 : TransactionId frozenXid,
195 : MultiXactId cutoffMulti);
196 : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
197 : static void copy_index_constraints(Relation old_index, Oid new_index_id,
198 : Oid new_heap_id);
199 : static Relation process_single_relation(RepackStmt *stmt,
200 : LOCKMODE lockmode,
201 : bool isTopLevel,
202 : ClusterParams *params);
203 : static Oid determine_clustered_index(Relation rel, bool usingindex,
204 : const char *indexname);
205 :
206 : static void start_repack_decoding_worker(Oid relid);
207 : static void stop_repack_decoding_worker(void);
208 : static Snapshot get_initial_snapshot(DecodingWorker *worker);
209 :
210 : static void ProcessRepackMessage(StringInfo msg);
211 : static const char *RepackCommandAsString(RepackCommand cmd);
212 :
213 :
214 : /*
215 : * The repack code allows for processing multiple tables at once. Because
216 : * of this, we cannot just run everything on a single transaction, or we
217 : * would be forced to acquire exclusive locks on all the tables being
218 : * clustered, simultaneously --- very likely leading to deadlock.
219 : *
220 : * To solve this we follow a similar strategy to VACUUM code, processing each
221 : * relation in a separate transaction. For this to work, we need to:
222 : *
223 : * - provide a separate memory context so that we can pass information in
224 : * a way that survives across transactions
225 : * - start a new transaction every time a new relation is clustered
226 : * - check for validity of the information on to-be-clustered relations,
227 : * as someone might have deleted a relation behind our back, or
228 : * clustered one on a different index
229 : * - end the transaction
230 : *
231 : * The single-relation case does not have any such overhead.
232 : *
233 : * We also allow a relation to be repacked following an index, but without
234 : * naming a specific one. In that case, the indisclustered bit will be
235 : * looked up, and an ERROR will be thrown if no so-marked index is found.
236 : */
237 : void
238 219 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
239 : {
240 219 : ClusterParams params = {0};
241 219 : Relation rel = NULL;
242 : MemoryContext repack_context;
243 : LOCKMODE lockmode;
244 : List *rtcs;
245 :
246 : /* Parse option list */
247 491 : foreach_node(DefElem, opt, stmt->params)
248 : {
249 53 : if (strcmp(opt->defname, "verbose") == 0)
250 6 : params.options |= defGetBoolean(opt) ? CLUOPT_VERBOSE : 0;
251 47 : else if (strcmp(opt->defname, "analyze") == 0 ||
252 39 : strcmp(opt->defname, "analyse") == 0)
253 8 : params.options |= defGetBoolean(opt) ? CLUOPT_ANALYZE : 0;
254 78 : else if (strcmp(opt->defname, "concurrently") == 0 &&
255 39 : defGetBoolean(opt))
256 : {
257 39 : if (stmt->command != REPACK_COMMAND_REPACK)
258 0 : ereport(ERROR,
259 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
260 : errmsg("CONCURRENTLY option not supported for %s",
261 : RepackCommandAsString(stmt->command)));
262 39 : params.options |= CLUOPT_CONCURRENT;
263 : }
264 : else
265 0 : ereport(ERROR,
266 : errcode(ERRCODE_SYNTAX_ERROR),
267 : errmsg("unrecognized %s option \"%s\"",
268 : RepackCommandAsString(stmt->command),
269 : opt->defname),
270 : parser_errposition(pstate, opt->location));
271 : }
272 :
273 : /* Determine the lock mode to use. */
274 219 : lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
275 :
276 219 : if ((params.options & CLUOPT_CONCURRENT) != 0)
277 : {
278 : /*
279 : * Make sure we're not in a transaction block.
280 : *
281 : * The reason is that repack_setup_logical_decoding() could wait
282 : * indefinitely for our XID to complete. (The deadlock detector would
283 : * not recognize it because we'd be waiting for ourselves, i.e. no
284 : * real lock conflict.) It would be possible to run in a transaction
285 : * block if we had no XID, but this restriction is simpler for users
286 : * to understand and we don't lose any functionality.
287 : */
288 39 : PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
289 : }
290 :
291 : /*
292 : * If a single relation is specified, process it and we're done ... unless
293 : * the relation is a partitioned table, in which case we fall through.
294 : */
295 219 : if (stmt->relation != NULL)
296 : {
297 204 : rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms);
298 156 : if (rel == NULL)
299 120 : return; /* all done */
300 : }
301 :
302 : /*
303 : * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
304 : * can add support for this later.
305 : */
306 51 : if (params.options & CLUOPT_ANALYZE)
307 0 : ereport(ERROR,
308 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
309 : errmsg("cannot execute %s on multiple tables",
310 : "REPACK (ANALYZE)"));
311 :
312 : /*
313 : * By here, we know we are in a multi-table situation.
314 : *
315 : * Concurrent processing is currently considered rather special (e.g. in
316 : * terms of resources consumed) so it is not performed in bulk.
317 : */
318 51 : if (params.options & CLUOPT_CONCURRENT)
319 : {
320 4 : if (rel != NULL)
321 : {
322 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
323 4 : ereport(ERROR,
324 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
325 : errmsg("REPACK (CONCURRENTLY) is not supported for partitioned tables"),
326 : errhint("Consider running the command on individual partitions."));
327 : }
328 : else
329 0 : ereport(ERROR,
330 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
331 : errmsg("REPACK (CONCURRENTLY) requires an explicit table name"));
332 : }
333 :
334 : /*
335 : * In order to avoid holding locks for too long, we want to process each
336 : * table in its own transaction. This forces us to disallow running
337 : * inside a user transaction block.
338 : */
339 47 : PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
340 :
341 : /* Also, we need a memory context to hold our list of relations */
342 47 : repack_context = AllocSetContextCreate(PortalContext,
343 : "Repack",
344 : ALLOCSET_DEFAULT_SIZES);
345 :
346 : /*
347 : * Since we open a new transaction for each relation, we have to check
348 : * that the relation still is what we think it is.
349 : *
350 : * In single-transaction CLUSTER, we don't need the overhead.
351 : */
352 47 : params.options |= CLUOPT_RECHECK;
353 :
354 : /*
355 : * If we don't have a relation yet, determine a relation list. If we do,
356 : * then it must be a partitioned table, and we want to process its
357 : * partitions.
358 : */
359 47 : if (rel == NULL)
360 : {
361 : Assert(stmt->indexname == NULL);
362 15 : rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
363 : repack_context);
364 15 : params.options |= CLUOPT_RECHECK_ISCLUSTERED;
365 : }
366 : else
367 : {
368 : Oid relid;
369 : bool rel_is_index;
370 :
371 : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
372 :
373 : /*
374 : * If USING INDEX was specified, resolve the index name now and pass
375 : * it down.
376 : */
377 32 : if (stmt->usingindex)
378 : {
379 : /*
380 : * If no index name was specified when repacking a partitioned
381 : * table, punt for now. Maybe we can improve this later.
382 : */
383 28 : if (!stmt->indexname)
384 : {
385 8 : if (stmt->command == REPACK_COMMAND_CLUSTER)
386 4 : ereport(ERROR,
387 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388 : errmsg("there is no previously clustered index for table \"%s\"",
389 : RelationGetRelationName(rel)));
390 : else
391 4 : ereport(ERROR,
392 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 : /*- translator: first %s is name of a SQL command, eg. REPACK */
394 : errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
395 : RepackCommandAsString(stmt->command),
396 : RelationGetRelationName(rel)));
397 : }
398 :
399 20 : relid = determine_clustered_index(rel, stmt->usingindex,
400 20 : stmt->indexname);
401 20 : if (!OidIsValid(relid))
402 0 : elog(ERROR, "unable to determine index to cluster on");
403 20 : check_index_is_clusterable(rel, relid, AccessExclusiveLock);
404 :
405 16 : rel_is_index = true;
406 : }
407 : else
408 : {
409 4 : relid = RelationGetRelid(rel);
410 4 : rel_is_index = false;
411 : }
412 :
413 20 : rtcs = get_tables_to_repack_partitioned(stmt->command,
414 : relid, rel_is_index,
415 : repack_context);
416 :
417 : /* close parent relation, releasing lock on it */
418 20 : table_close(rel, AccessExclusiveLock);
419 20 : rel = NULL;
420 : }
421 :
422 : /* Commit to get out of starting transaction */
423 35 : PopActiveSnapshot();
424 35 : CommitTransactionCommand();
425 :
426 : /* Cluster the tables, each in a separate transaction */
427 : Assert(rel == NULL);
428 122 : foreach_ptr(RelToCluster, rtc, rtcs)
429 : {
430 : /* Start a new transaction for each relation. */
431 52 : StartTransactionCommand();
432 :
433 : /*
434 : * Open the target table, coping with the case where it has been
435 : * dropped.
436 : */
437 52 : rel = try_table_open(rtc->tableOid, lockmode);
438 52 : if (rel == NULL)
439 : {
440 0 : CommitTransactionCommand();
441 0 : continue;
442 : }
443 :
444 : /* functions in indexes may want a snapshot set */
445 52 : PushActiveSnapshot(GetTransactionSnapshot());
446 :
447 : /* Process this table */
448 52 : cluster_rel(stmt->command, rel, rtc->indexOid, ¶ms, isTopLevel);
449 : /* cluster_rel closes the relation, but keeps lock */
450 :
451 52 : PopActiveSnapshot();
452 52 : CommitTransactionCommand();
453 : }
454 :
455 : /* Start a new transaction for the cleanup work. */
456 35 : StartTransactionCommand();
457 :
458 : /* Clean up working storage */
459 35 : MemoryContextDelete(repack_context);
460 : }
461 :
462 : /*
463 : * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
464 : * operation to avoid any lock-upgrade hazards. In the concurrent case, we
465 : * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
466 : * processing and only acquire AccessExclusiveLock at the end, to swap the
467 : * relation -- supposedly for a short time.
468 : */
469 : static LOCKMODE
470 1039 : RepackLockLevel(bool concurrent)
471 : {
472 1039 : if (concurrent)
473 77 : return ShareUpdateExclusiveLock;
474 : else
475 962 : return AccessExclusiveLock;
476 : }
477 :
478 : /*
479 : * cluster_rel
480 : *
481 : * This clusters the table by creating a new, clustered table and
482 : * swapping the relfilenumbers of the new table and the old table, so
483 : * the OID of the original table is preserved. Thus we do not lose
484 : * GRANT, inheritance nor references to this table.
485 : *
486 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
487 : * the new table, it's better to create the indexes afterwards than to fill
488 : * them incrementally while we load the table.
489 : *
490 : * If indexOid is InvalidOid, the table will be rewritten in physical order
491 : * instead of index order.
492 : *
493 : * Note that, in the concurrent case, the function releases the lock at some
494 : * point, in order to get AccessExclusiveLock for the final steps (i.e. to
495 : * swap the relation files). To make things simpler, the caller should expect
496 : * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
497 : * AccessExclusiveLock is kept till the end of the transaction.)
498 : *
499 : * 'cmd' indicates which command is being executed, to be used for error
500 : * messages.
501 : */
502 : void
503 426 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
504 : ClusterParams *params, bool isTopLevel)
505 : {
506 426 : Oid tableOid = RelationGetRelid(OldHeap);
507 : Relation index;
508 : LOCKMODE lmode;
509 : Oid save_userid;
510 : int save_sec_context;
511 : int save_nestlevel;
512 426 : bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
513 426 : bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
514 426 : bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
515 426 : Oid ident_idx = InvalidOid;
516 :
517 : /* Determine the lock mode to use. */
518 426 : lmode = RepackLockLevel(concurrent);
519 :
520 : /*
521 : * Check some preconditions in the concurrent case. This also obtains the
522 : * replica index OID.
523 : */
524 426 : if (concurrent)
525 35 : check_concurrent_repack_requirements(OldHeap, &ident_idx);
526 :
527 : /* Check for user-requested abort. */
528 394 : CHECK_FOR_INTERRUPTS();
529 :
530 394 : pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
531 394 : pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
532 :
533 : /*
534 : * Switch to the table owner's userid, so that any index functions are run
535 : * as that user. Also lock down security-restricted operations and
536 : * arrange to make GUC variable changes local to this command.
537 : */
538 394 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
539 394 : SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
540 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
541 394 : save_nestlevel = NewGUCNestLevel();
542 394 : RestrictSearchPath();
543 :
544 : /*
545 : * Recheck that the relation is still what it was when we started.
546 : *
547 : * Note that it's critical to skip this in single-relation CLUSTER;
548 : * otherwise, we would reject an attempt to cluster using a
549 : * not-previously-clustered index.
550 : */
551 394 : if (recheck &&
552 52 : !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
553 52 : lmode, params->options))
554 0 : goto out;
555 :
556 : /*
557 : * We allow repacking shared catalogs only when not using an index. It
558 : * would work to use an index in most respects, but the index would only
559 : * get marked as indisclustered in the current database, leading to
560 : * unexpected behavior if CLUSTER were later invoked in another database.
561 : */
562 394 : if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
563 0 : ereport(ERROR,
564 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
565 : /*- translator: first %s is name of a SQL command, eg. REPACK */
566 : errmsg("cannot execute %s on a shared catalog",
567 : RepackCommandAsString(cmd)));
568 :
569 : /*
570 : * The CONCURRENTLY case should have been rejected earlier because it does
571 : * not support system catalogs.
572 : */
573 : Assert(!(OldHeap->rd_rel->relisshared && concurrent));
574 :
575 : /*
576 : * Don't process temp tables of other backends ... their local buffer
577 : * manager is not going to cope.
578 : */
579 394 : if (RELATION_IS_OTHER_TEMP(OldHeap))
580 0 : ereport(ERROR,
581 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
582 : /*- translator: first %s is name of a SQL command, eg. REPACK */
583 : errmsg("cannot execute %s on temporary tables of other sessions",
584 : RepackCommandAsString(cmd)));
585 :
586 : /*
587 : * Also check for active uses of the relation in the current transaction,
588 : * including open scans and pending AFTER trigger events.
589 : */
590 394 : CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
591 :
592 : /* Check heap and index are valid to cluster on */
593 394 : if (OidIsValid(indexOid))
594 : {
595 : /* verify the index is good and lock it */
596 142 : check_index_is_clusterable(OldHeap, indexOid, lmode);
597 : /* also open it */
598 142 : index = index_open(indexOid, NoLock);
599 : }
600 : else
601 252 : index = NULL;
602 :
603 : /*
604 : * When allow_system_table_mods is turned off, we disallow repacking a
605 : * catalog on a particular index unless that's already the clustered index
606 : * for that catalog.
607 : *
608 : * XXX We don't check for this in CLUSTER, because it's historically been
609 : * allowed.
610 : */
611 394 : if (cmd != REPACK_COMMAND_CLUSTER &&
612 269 : !allowSystemTableMods && OidIsValid(indexOid) &&
613 17 : IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
614 0 : ereport(ERROR,
615 : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
616 : errmsg("permission denied: \"%s\" is a system catalog",
617 : RelationGetRelationName(OldHeap)),
618 : errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
619 : "allow_system_table_mods"));
620 :
621 : /*
622 : * Quietly ignore the request if this is a materialized view which has not
623 : * been populated from its query. No harm is done because there is no data
624 : * to deal with, and we don't want to throw an error if this is part of a
625 : * multi-relation request -- for example, CLUSTER was run on the entire
626 : * database.
627 : */
628 394 : if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
629 0 : !RelationIsPopulated(OldHeap))
630 : {
631 0 : if (index)
632 0 : index_close(index, lmode);
633 0 : relation_close(OldHeap, lmode);
634 0 : goto out;
635 : }
636 :
637 : Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
638 : OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
639 : OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
640 :
641 : /*
642 : * All predicate locks on the tuples or pages are about to be made
643 : * invalid, because we move tuples around. Promote them to relation
644 : * locks. Predicate locks on indexes will be promoted when they are
645 : * reindexed.
646 : *
647 : * During concurrent processing, the heap as well as its indexes stay in
648 : * operation, so we postpone this step until they are locked using
649 : * AccessExclusiveLock near the end of the processing.
650 : */
651 394 : if (!concurrent)
652 391 : TransferPredicateLocksToHeapRelation(OldHeap);
653 :
654 : /* rebuild_relation does all the dirty work */
655 394 : PG_TRY();
656 : {
657 394 : rebuild_relation(OldHeap, index, verbose, ident_idx);
658 : }
659 4 : PG_FINALLY();
660 : {
661 394 : if (concurrent)
662 : {
663 : /*
664 : * Since during normal operation the worker was already asked to
665 : * exit, stopping it explicitly is especially important on ERROR.
666 : * However it still seems a good practice to make sure that the
667 : * worker never survives the REPACK command.
668 : */
669 3 : stop_repack_decoding_worker();
670 : }
671 : }
672 394 : PG_END_TRY();
673 :
674 : /* rebuild_relation closes OldHeap, and index if valid */
675 :
676 390 : out:
677 : /* Roll back any GUC changes executed by index functions */
678 390 : AtEOXact_GUC(false, save_nestlevel);
679 :
680 : /* Restore userid and security context */
681 390 : SetUserIdAndSecContext(save_userid, save_sec_context);
682 :
683 390 : pgstat_progress_end_command();
684 390 : }
685 :
686 : /*
687 : * Check if the table (and its index) still meets the requirements of
688 : * cluster_rel().
689 : */
690 : static bool
691 52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
692 : Oid userid, LOCKMODE lmode, int options)
693 : {
694 52 : Oid tableOid = RelationGetRelid(OldHeap);
695 :
696 : /* Check that the user still has privileges for the relation */
697 52 : if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
698 : {
699 0 : relation_close(OldHeap, lmode);
700 0 : return false;
701 : }
702 :
703 : /*
704 : * Silently skip a temp table for a remote session. Only doing this check
705 : * in the "recheck" case is appropriate (which currently means somebody is
706 : * executing a database-wide CLUSTER or on a partitioned table), because
707 : * there is another check in cluster() which will stop any attempt to
708 : * cluster remote temp tables by name. There is another check in
709 : * cluster_rel which is redundant, but we leave it for extra safety.
710 : */
711 52 : if (RELATION_IS_OTHER_TEMP(OldHeap))
712 : {
713 0 : relation_close(OldHeap, lmode);
714 0 : return false;
715 : }
716 :
717 52 : if (OidIsValid(indexOid))
718 : {
719 : /*
720 : * Check that the index still exists
721 : */
722 32 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
723 : {
724 0 : relation_close(OldHeap, lmode);
725 0 : return false;
726 : }
727 :
728 : /*
729 : * Check that the index is still the one with indisclustered set, if
730 : * needed.
731 : */
732 32 : if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
733 4 : !get_index_isclustered(indexOid))
734 : {
735 0 : relation_close(OldHeap, lmode);
736 0 : return false;
737 : }
738 : }
739 :
740 52 : return true;
741 : }
742 :
743 : /*
744 : * Verify that the specified heap and index are valid to cluster on
745 : *
746 : * Side effect: obtains lock on the index. The caller may
747 : * in some cases already have a lock of the same strength on the table, but
748 : * not in all cases so we can't rely on the table-level lock for
749 : * protection here.
750 : */
751 : void
752 311 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
753 : {
754 : Relation OldIndex;
755 :
756 311 : OldIndex = index_open(indexOid, lockmode);
757 :
758 : /*
759 : * Check that index is in fact an index on the given relation
760 : */
761 311 : if (OldIndex->rd_index == NULL ||
762 311 : OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
763 0 : ereport(ERROR,
764 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
765 : errmsg("\"%s\" is not an index for table \"%s\"",
766 : RelationGetRelationName(OldIndex),
767 : RelationGetRelationName(OldHeap))));
768 :
769 : /* Index AM must allow clustering */
770 311 : if (!OldIndex->rd_indam->amclusterable)
771 0 : ereport(ERROR,
772 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
773 : errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
774 : RelationGetRelationName(OldIndex))));
775 :
776 : /*
777 : * Disallow clustering on incomplete indexes (those that might not index
778 : * every row of the relation). We could relax this by making a separate
779 : * seqscan pass over the table to copy the missing rows, but that seems
780 : * expensive and tedious.
781 : */
782 311 : if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
783 0 : ereport(ERROR,
784 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
785 : errmsg("cannot cluster on partial index \"%s\"",
786 : RelationGetRelationName(OldIndex))));
787 :
788 : /*
789 : * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
790 : * it might well not contain entries for every heap row, or might not even
791 : * be internally consistent. (But note that we don't check indcheckxmin;
792 : * the worst consequence of following broken HOT chains would be that we
793 : * might put recently-dead tuples out-of-order in the new table, and there
794 : * is little harm in that.)
795 : */
796 311 : if (!OldIndex->rd_index->indisvalid)
797 4 : ereport(ERROR,
798 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
799 : errmsg("cannot cluster on invalid index \"%s\"",
800 : RelationGetRelationName(OldIndex))));
801 :
802 : /* Drop relcache refcnt on OldIndex, but keep lock */
803 307 : index_close(OldIndex, NoLock);
804 307 : }
805 :
806 : /*
807 : * mark_index_clustered: mark the specified index as the one clustered on
808 : *
809 : * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
810 : */
811 : void
812 189 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
813 : {
814 : HeapTuple indexTuple;
815 : Form_pg_index indexForm;
816 : Relation pg_index;
817 : ListCell *index;
818 :
819 : Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
820 :
821 : /*
822 : * If the index is already marked clustered, no need to do anything.
823 : */
824 189 : if (OidIsValid(indexOid))
825 : {
826 181 : if (get_index_isclustered(indexOid))
827 40 : return;
828 : }
829 :
830 : /*
831 : * Check each index of the relation and set/clear the bit as needed.
832 : */
833 149 : pg_index = table_open(IndexRelationId, RowExclusiveLock);
834 :
835 452 : foreach(index, RelationGetIndexList(rel))
836 : {
837 303 : Oid thisIndexOid = lfirst_oid(index);
838 :
839 303 : indexTuple = SearchSysCacheCopy1(INDEXRELID,
840 : ObjectIdGetDatum(thisIndexOid));
841 303 : if (!HeapTupleIsValid(indexTuple))
842 0 : elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
843 303 : indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
844 :
845 : /*
846 : * Unset the bit if set. We know it's wrong because we checked this
847 : * earlier.
848 : */
849 303 : if (indexForm->indisclustered)
850 : {
851 20 : indexForm->indisclustered = false;
852 20 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
853 : }
854 283 : else if (thisIndexOid == indexOid)
855 : {
856 : /* this was checked earlier, but let's be real sure */
857 141 : if (!indexForm->indisvalid)
858 0 : elog(ERROR, "cannot cluster on invalid index %u", indexOid);
859 141 : indexForm->indisclustered = true;
860 141 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
861 : }
862 :
863 303 : InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
864 : InvalidOid, is_internal);
865 :
866 303 : heap_freetuple(indexTuple);
867 : }
868 :
869 149 : table_close(pg_index, RowExclusiveLock);
870 : }
871 :
872 : /*
873 : * Check if the CONCURRENTLY option is legal for the relation.
874 : *
875 : * *Ident_idx_p receives OID of the identity index.
876 : */
877 : static void
878 35 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
879 : {
880 : char relpersistence,
881 : replident;
882 : Oid ident_idx;
883 :
884 : /* Data changes in system relations are not logically decoded. */
885 35 : if (IsCatalogRelation(rel))
886 8 : ereport(ERROR,
887 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
888 : errmsg("cannot repack relation \"%s\"",
889 : RelationGetRelationName(rel)),
890 : errhint("REPACK CONCURRENTLY is not supported for catalog relations."));
891 :
892 : /*
893 : * reorderbuffer.c does not seem to handle processing of TOAST relation
894 : * alone.
895 : */
896 27 : if (IsToastRelation(rel))
897 4 : ereport(ERROR,
898 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
899 : errmsg("cannot repack relation \"%s\"",
900 : RelationGetRelationName(rel)),
901 : errhint("REPACK CONCURRENTLY is not supported for TOAST relations"));
902 :
903 23 : relpersistence = rel->rd_rel->relpersistence;
904 23 : if (relpersistence != RELPERSISTENCE_PERMANENT)
905 8 : ereport(ERROR,
906 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
907 : errmsg("cannot repack relation \"%s\"",
908 : RelationGetRelationName(rel)),
909 : errhint("REPACK CONCURRENTLY is only allowed for permanent relations."));
910 :
911 : /* With NOTHING, WAL does not contain the old tuple. */
912 15 : replident = rel->rd_rel->relreplident;
913 15 : if (replident == REPLICA_IDENTITY_NOTHING)
914 4 : ereport(ERROR,
915 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
916 : errmsg("cannot repack relation \"%s\"",
917 : RelationGetRelationName(rel)),
918 : errhint("Relation \"%s\" has insufficient replication identity.",
919 : RelationGetRelationName(rel)));
920 :
921 : /*
922 : * Obtain the replica identity index -- either one that has been set
923 : * explicitly, or a non-deferrable primary key. If none of these cases
924 : * apply, the table cannot be repacked concurrently. It might be possible
925 : * to have repack work with a FULL replica identity; however that requires
926 : * more work and is not implemented yet.
927 : */
928 11 : ident_idx = GetRelationIdentityOrPK(rel);
929 11 : if (!OidIsValid(ident_idx))
930 8 : ereport(ERROR,
931 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
932 : errmsg("cannot process relation \"%s\"",
933 : RelationGetRelationName(rel)),
934 : errhint("Relation \"%s\" has no identity index.",
935 : RelationGetRelationName(rel)));
936 :
937 3 : *ident_idx_p = ident_idx;
938 3 : }
939 :
940 :
941 : /*
942 : * rebuild_relation: rebuild an existing relation in index or physical order
943 : *
944 : * OldHeap: table to rebuild. See cluster_rel() for comments on the required
945 : * lock strength.
946 : *
947 : * index: index to cluster by, or NULL to rewrite in physical order.
948 : *
949 : * ident_idx: identity index, to handle replaying of concurrent data changes
950 : * to the new heap. InvalidOid if there's no CONCURRENTLY option.
951 : *
952 : * On entry, heap and index (if one is given) must be open, and the
953 : * appropriate lock held on them -- AccessExclusiveLock for exclusive
954 : * processing and ShareUpdateExclusiveLock for concurrent processing.
955 : *
956 : * On exit, they are closed, but still locked with AccessExclusiveLock.
957 : * (The function handles the lock upgrade if 'concurrent' is true.)
958 : */
959 : static void
960 394 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
961 : Oid ident_idx)
962 : {
963 394 : Oid tableOid = RelationGetRelid(OldHeap);
964 394 : Oid accessMethod = OldHeap->rd_rel->relam;
965 394 : Oid tableSpace = OldHeap->rd_rel->reltablespace;
966 : Oid OIDNewHeap;
967 : Relation NewHeap;
968 : char relpersistence;
969 : bool swap_toast_by_content;
970 : TransactionId frozenXid;
971 : MultiXactId cutoffMulti;
972 394 : bool concurrent = OidIsValid(ident_idx);
973 394 : Snapshot snapshot = NULL;
974 : #if USE_ASSERT_CHECKING
975 : LOCKMODE lmode;
976 :
977 : lmode = RepackLockLevel(concurrent);
978 :
979 : Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
980 : Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
981 : #endif
982 :
983 394 : if (concurrent)
984 : {
985 : /*
986 : * The worker needs to be member of the locking group we're the leader
987 : * of. We ought to become the leader before the worker starts. The
988 : * worker will join the group as soon as it starts.
989 : *
990 : * This is to make sure that the deadlock described below is
991 : * detectable by deadlock.c: if the worker waits for a transaction to
992 : * complete and we are waiting for the worker output, then effectively
993 : * we (i.e. this backend) are waiting for that transaction.
994 : */
995 3 : BecomeLockGroupLeader();
996 :
997 : /*
998 : * Start the worker that decodes data changes applied while we're
999 : * copying the table contents.
1000 : *
1001 : * Note that the worker has to wait for all transactions with XID
1002 : * already assigned to finish. If some of those transactions is
1003 : * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1004 : * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1005 : * Not sure this risk is worth unlocking/locking the table (and its
1006 : * clustering index) and checking again if it's still eligible for
1007 : * REPACK CONCURRENTLY.
1008 : */
1009 3 : start_repack_decoding_worker(tableOid);
1010 :
1011 : /*
1012 : * Wait until the worker has the initial snapshot and retrieve it.
1013 : */
1014 3 : snapshot = get_initial_snapshot(decoding_worker);
1015 :
1016 3 : PushActiveSnapshot(snapshot);
1017 : }
1018 :
1019 : /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1020 394 : if (index != NULL)
1021 142 : mark_index_clustered(OldHeap, RelationGetRelid(index), true);
1022 :
1023 : /* Remember info about rel before closing OldHeap */
1024 394 : relpersistence = OldHeap->rd_rel->relpersistence;
1025 :
1026 : /*
1027 : * Create the transient table that will receive the re-ordered data.
1028 : *
1029 : * OldHeap is already locked, so no need to lock it again. make_new_heap
1030 : * obtains AccessExclusiveLock on the new heap and its toast table.
1031 : */
1032 394 : OIDNewHeap = make_new_heap(tableOid, tableSpace,
1033 : accessMethod,
1034 : relpersistence,
1035 : NoLock);
1036 : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
1037 394 : NewHeap = table_open(OIDNewHeap, NoLock);
1038 :
1039 : /* Copy the heap data into the new table in the desired order */
1040 394 : copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
1041 : &swap_toast_by_content, &frozenXid, &cutoffMulti);
1042 :
1043 : /* The historic snapshot won't be needed anymore. */
1044 394 : if (snapshot)
1045 : {
1046 3 : PopActiveSnapshot();
1047 3 : UpdateActiveSnapshotCommandId();
1048 : }
1049 :
1050 394 : if (concurrent)
1051 : {
1052 : Assert(!swap_toast_by_content);
1053 :
1054 : /*
1055 : * Close the index, but keep the lock. Both heaps will be closed by
1056 : * the following call.
1057 : */
1058 3 : if (index)
1059 1 : index_close(index, NoLock);
1060 :
1061 3 : rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
1062 : frozenXid, cutoffMulti);
1063 :
1064 3 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1065 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1066 : }
1067 : else
1068 : {
1069 391 : bool is_system_catalog = IsSystemRelation(OldHeap);
1070 :
1071 : /* Close relcache entries, but keep lock until transaction commit */
1072 391 : table_close(OldHeap, NoLock);
1073 391 : if (index)
1074 141 : index_close(index, NoLock);
1075 :
1076 : /*
1077 : * Close the new relation so it can be dropped as soon as the storage
1078 : * is swapped. The relation is not visible to others, so no need to
1079 : * unlock it explicitly.
1080 : */
1081 391 : table_close(NewHeap, NoLock);
1082 :
1083 : /*
1084 : * Swap the physical files of the target and transient tables, then
1085 : * rebuild the target's indexes and throw away the transient table.
1086 : */
1087 391 : finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
1088 : swap_toast_by_content, false, true,
1089 : true, /* reindex */
1090 : frozenXid, cutoffMulti,
1091 : relpersistence);
1092 : }
1093 390 : }
1094 :
1095 :
1096 : /*
1097 : * Create the transient table that will be filled with new data during
1098 : * CLUSTER, ALTER TABLE, and similar operations. The transient table
1099 : * duplicates the logical structure of the OldHeap; but will have the
1100 : * specified physical storage properties NewTableSpace, NewAccessMethod, and
1101 : * relpersistence.
1102 : *
1103 : * After this, the caller should load the new heap with transferred/modified
1104 : * data, then call finish_heap_swap to complete the operation.
1105 : */
1106 : Oid
1107 1566 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
1108 : char relpersistence, LOCKMODE lockmode)
1109 : {
1110 : TupleDesc OldHeapDesc;
1111 : char NewHeapName[NAMEDATALEN];
1112 : Oid OIDNewHeap;
1113 : Oid toastid;
1114 : Relation OldHeap;
1115 : HeapTuple tuple;
1116 : Datum reloptions;
1117 : bool isNull;
1118 : Oid namespaceid;
1119 :
1120 1566 : OldHeap = table_open(OIDOldHeap, lockmode);
1121 1566 : OldHeapDesc = RelationGetDescr(OldHeap);
1122 :
1123 : /*
1124 : * Note that the NewHeap will not receive any of the defaults or
1125 : * constraints associated with the OldHeap; we don't need 'em, and there's
1126 : * no reason to spend cycles inserting them into the catalogs only to
1127 : * delete them.
1128 : */
1129 :
1130 : /*
1131 : * But we do want to use reloptions of the old heap for new heap.
1132 : */
1133 1566 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1134 1566 : if (!HeapTupleIsValid(tuple))
1135 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1136 1566 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1137 : &isNull);
1138 1566 : if (isNull)
1139 1473 : reloptions = (Datum) 0;
1140 :
1141 1566 : if (relpersistence == RELPERSISTENCE_TEMP)
1142 98 : namespaceid = LookupCreationNamespace("pg_temp");
1143 : else
1144 1468 : namespaceid = RelationGetNamespace(OldHeap);
1145 :
1146 : /*
1147 : * Create the new heap, using a temporary name in the same namespace as
1148 : * the existing table. NOTE: there is some risk of collision with user
1149 : * relnames. Working around this seems more trouble than it's worth; in
1150 : * particular, we can't create the new heap in a different namespace from
1151 : * the old, or we will have problems with the TEMP status of temp tables.
1152 : *
1153 : * Note: the new heap is not a shared relation, even if we are rebuilding
1154 : * a shared rel. However, we do make the new heap mapped if the source is
1155 : * mapped. This simplifies swap_relation_files, and is absolutely
1156 : * necessary for rebuilding pg_class, for reasons explained there.
1157 : */
1158 1566 : snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1159 :
1160 1566 : OIDNewHeap = heap_create_with_catalog(NewHeapName,
1161 : namespaceid,
1162 : NewTableSpace,
1163 : InvalidOid,
1164 : InvalidOid,
1165 : InvalidOid,
1166 1566 : OldHeap->rd_rel->relowner,
1167 : NewAccessMethod,
1168 : OldHeapDesc,
1169 : NIL,
1170 : RELKIND_RELATION,
1171 : relpersistence,
1172 : false,
1173 1566 : RelationIsMapped(OldHeap),
1174 : ONCOMMIT_NOOP,
1175 : reloptions,
1176 : false,
1177 : true,
1178 : true,
1179 : OIDOldHeap,
1180 1566 : NULL);
1181 : Assert(OIDNewHeap != InvalidOid);
1182 :
1183 1566 : ReleaseSysCache(tuple);
1184 :
1185 : /*
1186 : * Advance command counter so that the newly-created relation's catalog
1187 : * tuples will be visible to table_open.
1188 : */
1189 1566 : CommandCounterIncrement();
1190 :
1191 : /*
1192 : * If necessary, create a TOAST table for the new relation.
1193 : *
1194 : * If the relation doesn't have a TOAST table already, we can't need one
1195 : * for the new relation. The other way around is possible though: if some
1196 : * wide columns have been dropped, NewHeapCreateToastTable can decide that
1197 : * no TOAST table is needed for the new table.
1198 : *
1199 : * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1200 : * that the TOAST table will be visible for insertion.
1201 : */
1202 1566 : toastid = OldHeap->rd_rel->reltoastrelid;
1203 1566 : if (OidIsValid(toastid))
1204 : {
1205 : /* keep the existing toast table's reloptions, if any */
1206 553 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
1207 553 : if (!HeapTupleIsValid(tuple))
1208 0 : elog(ERROR, "cache lookup failed for relation %u", toastid);
1209 553 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1210 : &isNull);
1211 553 : if (isNull)
1212 553 : reloptions = (Datum) 0;
1213 :
1214 553 : NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1215 :
1216 553 : ReleaseSysCache(tuple);
1217 : }
1218 :
1219 1566 : table_close(OldHeap, NoLock);
1220 :
1221 1566 : return OIDNewHeap;
1222 : }
1223 :
1224 : /*
1225 : * Do the physical copying of table data.
1226 : *
1227 : * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1228 : * iff concurrent processing is required.
1229 : *
1230 : * There are three output parameters:
1231 : * *pSwapToastByContent is set true if toast tables must be swapped by content.
1232 : * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1233 : * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1234 : */
1235 : static void
1236 394 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
1237 : Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1238 : TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
1239 : {
1240 : Relation relRelation;
1241 : HeapTuple reltup;
1242 : Form_pg_class relform;
1243 : TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
1244 : TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY;
1245 : VacuumParams params;
1246 : struct VacuumCutoffs cutoffs;
1247 : bool use_sort;
1248 394 : double num_tuples = 0,
1249 394 : tups_vacuumed = 0,
1250 394 : tups_recently_dead = 0;
1251 : BlockNumber num_pages;
1252 394 : int elevel = verbose ? INFO : DEBUG2;
1253 : PGRUsage ru0;
1254 : char *nspname;
1255 394 : bool concurrent = snapshot != NULL;
1256 : LOCKMODE lmode;
1257 :
1258 394 : lmode = RepackLockLevel(concurrent);
1259 :
1260 394 : pg_rusage_init(&ru0);
1261 :
1262 : /* Store a copy of the namespace name for logging purposes */
1263 394 : nspname = get_namespace_name(RelationGetNamespace(OldHeap));
1264 :
1265 : /*
1266 : * Their tuple descriptors should be exactly alike, but here we only need
1267 : * assume that they have the same number of columns.
1268 : */
1269 394 : oldTupDesc = RelationGetDescr(OldHeap);
1270 394 : newTupDesc = RelationGetDescr(NewHeap);
1271 : Assert(newTupDesc->natts == oldTupDesc->natts);
1272 :
1273 : /*
1274 : * If the OldHeap has a toast table, get lock on the toast table to keep
1275 : * it from being vacuumed. This is needed because autovacuum processes
1276 : * toast tables independently of their main tables, with no lock on the
1277 : * latter. If an autovacuum were to start on the toast table after we
1278 : * compute our OldestXmin below, it would use a later OldestXmin, and then
1279 : * possibly remove as DEAD toast tuples belonging to main tuples we think
1280 : * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1281 : * tuples.
1282 : *
1283 : * We don't need to open the toast relation here, just lock it. The lock
1284 : * will be held till end of transaction.
1285 : */
1286 394 : if (OldHeap->rd_rel->reltoastrelid)
1287 125 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1288 :
1289 : /*
1290 : * If both tables have TOAST tables, perform toast swap by content. It is
1291 : * possible that the old table has a toast table but the new one doesn't,
1292 : * if toastable columns have been dropped. In that case we have to do
1293 : * swap by links. This is okay because swap by content is only essential
1294 : * for system catalogs, and we don't support schema changes for them.
1295 : */
1296 394 : if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1297 125 : !concurrent)
1298 : {
1299 124 : *pSwapToastByContent = true;
1300 :
1301 : /*
1302 : * When doing swap by content, any toast pointers written into NewHeap
1303 : * must use the old toast table's OID, because that's where the toast
1304 : * data will eventually be found. Set this up by setting rd_toastoid.
1305 : * This also tells toast_save_datum() to preserve the toast value
1306 : * OIDs, which we want so as not to invalidate toast pointers in
1307 : * system catalog caches, and to avoid making multiple copies of a
1308 : * single toast value.
1309 : *
1310 : * Note that we must hold NewHeap open until we are done writing data,
1311 : * since the relcache will not guarantee to remember this setting once
1312 : * the relation is closed. Also, this technique depends on the fact
1313 : * that no one will try to read from the NewHeap until after we've
1314 : * finished writing it and swapping the rels --- otherwise they could
1315 : * follow the toast pointers to the wrong place. (It would actually
1316 : * work for values copied over from the old toast table, but not for
1317 : * any values that we toast which were previously not toasted.)
1318 : *
1319 : * This would not work with CONCURRENTLY because we may need to delete
1320 : * TOASTed tuples from the new heap. With this hack, we'd delete them
1321 : * from the old heap.
1322 : */
1323 124 : NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1324 : }
1325 : else
1326 270 : *pSwapToastByContent = false;
1327 :
1328 : /*
1329 : * Compute xids used to freeze and weed out dead tuples and multixacts.
1330 : * Since we're going to rewrite the whole table anyway, there's no reason
1331 : * not to be aggressive about this.
1332 : */
1333 394 : memset(¶ms, 0, sizeof(VacuumParams));
1334 394 : vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs);
1335 :
1336 : /*
1337 : * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1338 : * backwards, so take the max.
1339 : */
1340 : {
1341 394 : TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1342 :
1343 788 : if (TransactionIdIsValid(relfrozenxid) &&
1344 394 : TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
1345 10 : cutoffs.FreezeLimit = relfrozenxid;
1346 : }
1347 :
1348 : /*
1349 : * MultiXactCutoff, similarly, shouldn't go backwards either.
1350 : */
1351 : {
1352 394 : MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1353 :
1354 788 : if (MultiXactIdIsValid(relminmxid) &&
1355 394 : MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
1356 0 : cutoffs.MultiXactCutoff = relminmxid;
1357 : }
1358 :
1359 : /*
1360 : * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1361 : * the OldHeap. We know how to use a sort to duplicate the ordering of a
1362 : * btree index, and will use seqscan-and-sort for that case if the planner
1363 : * tells us it's cheaper. Otherwise, always indexscan if an index is
1364 : * provided, else plain seqscan.
1365 : */
1366 394 : if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1367 142 : use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
1368 : RelationGetRelid(OldIndex));
1369 : else
1370 252 : use_sort = false;
1371 :
1372 : /* Log what we're doing */
1373 394 : if (OldIndex != NULL && !use_sort)
1374 60 : ereport(elevel,
1375 : errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1376 : nspname,
1377 : RelationGetRelationName(OldHeap),
1378 : RelationGetRelationName(OldIndex)));
1379 334 : else if (use_sort)
1380 82 : ereport(elevel,
1381 : errmsg("repacking \"%s.%s\" using sequential scan and sort",
1382 : nspname,
1383 : RelationGetRelationName(OldHeap)));
1384 : else
1385 252 : ereport(elevel,
1386 : errmsg("repacking \"%s.%s\" in physical order",
1387 : nspname,
1388 : RelationGetRelationName(OldHeap)));
1389 :
1390 : /*
1391 : * Hand off the actual copying to AM specific function, the generic code
1392 : * cannot know how to deal with visibility across AMs. Note that this
1393 : * routine is allowed to set FreezeXid / MultiXactCutoff to different
1394 : * values (e.g. because the AM doesn't use freezing).
1395 : */
1396 394 : table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
1397 : cutoffs.OldestXmin, snapshot,
1398 : &cutoffs.FreezeLimit,
1399 : &cutoffs.MultiXactCutoff,
1400 : &num_tuples, &tups_vacuumed,
1401 : &tups_recently_dead);
1402 :
1403 : /* return selected values to caller, get set as relfrozenxid/minmxid */
1404 394 : *pFreezeXid = cutoffs.FreezeLimit;
1405 394 : *pCutoffMulti = cutoffs.MultiXactCutoff;
1406 :
1407 : /*
1408 : * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1409 : * In the CONCURRENTLY case, we need to set it again before applying the
1410 : * concurrent changes.
1411 : */
1412 394 : NewHeap->rd_toastoid = InvalidOid;
1413 :
1414 394 : num_pages = RelationGetNumberOfBlocks(NewHeap);
1415 :
1416 : /* Log what we did */
1417 394 : ereport(elevel,
1418 : (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1419 : nspname,
1420 : RelationGetRelationName(OldHeap),
1421 : tups_vacuumed, num_tuples,
1422 : RelationGetNumberOfBlocks(OldHeap)),
1423 : errdetail("%.0f dead row versions cannot be removed yet.\n"
1424 : "%s.",
1425 : tups_recently_dead,
1426 : pg_rusage_show(&ru0))));
1427 :
1428 : /* Update pg_class to reflect the correct values of pages and tuples. */
1429 394 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1430 :
1431 394 : reltup = SearchSysCacheCopy1(RELOID,
1432 : ObjectIdGetDatum(RelationGetRelid(NewHeap)));
1433 394 : if (!HeapTupleIsValid(reltup))
1434 0 : elog(ERROR, "cache lookup failed for relation %u",
1435 : RelationGetRelid(NewHeap));
1436 394 : relform = (Form_pg_class) GETSTRUCT(reltup);
1437 :
1438 394 : relform->relpages = num_pages;
1439 394 : relform->reltuples = num_tuples;
1440 :
1441 : /* Don't update the stats for pg_class. See swap_relation_files. */
1442 394 : if (RelationGetRelid(OldHeap) != RelationRelationId)
1443 371 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1444 : else
1445 23 : CacheInvalidateRelcacheByTuple(reltup);
1446 :
1447 : /* Clean up. */
1448 394 : heap_freetuple(reltup);
1449 394 : table_close(relRelation, RowExclusiveLock);
1450 :
1451 : /* Make the update visible */
1452 394 : CommandCounterIncrement();
1453 394 : }
1454 :
1455 : /*
1456 : * Swap the physical files of two given relations.
1457 : *
1458 : * We swap the physical identity (reltablespace, relfilenumber) while keeping
1459 : * the same logical identities of the two relations. relpersistence is also
1460 : * swapped, which is critical since it determines where buffers live for each
1461 : * relation.
1462 : *
1463 : * We can swap associated TOAST data in either of two ways: recursively swap
1464 : * the physical content of the toast tables (and their indexes), or swap the
1465 : * TOAST links in the given relations' pg_class entries. The former is needed
1466 : * to manage rewrites of shared catalogs (where we cannot change the pg_class
1467 : * links) while the latter is the only way to handle cases in which a toast
1468 : * table is added or removed altogether.
1469 : *
1470 : * Additionally, the first relation is marked with relfrozenxid set to
1471 : * frozenXid. It seems a bit ugly to have this here, but the caller would
1472 : * have to do it anyway, so having it here saves a heap_update. Note: in
1473 : * the swap-toast-links case, we assume we don't need to change the toast
1474 : * table's relfrozenxid: the new version of the toast table should already
1475 : * have relfrozenxid set to RecentXmin, which is good enough.
1476 : *
1477 : * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1478 : * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1479 : * having to look the information up again later in finish_heap_swap.
1480 : */
1481 : static void
1482 1685 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1483 : bool swap_toast_by_content,
1484 : bool is_internal,
1485 : TransactionId frozenXid,
1486 : MultiXactId cutoffMulti,
1487 : Oid *mapped_tables)
1488 : {
1489 : Relation relRelation;
1490 : HeapTuple reltup1,
1491 : reltup2;
1492 : Form_pg_class relform1,
1493 : relform2;
1494 : RelFileNumber relfilenumber1,
1495 : relfilenumber2;
1496 : RelFileNumber swaptemp;
1497 : char swptmpchr;
1498 : Oid relam1,
1499 : relam2;
1500 :
1501 : /* We need writable copies of both pg_class tuples. */
1502 1685 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1503 :
1504 1685 : reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1505 1685 : if (!HeapTupleIsValid(reltup1))
1506 0 : elog(ERROR, "cache lookup failed for relation %u", r1);
1507 1685 : relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1508 :
1509 1685 : reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1510 1685 : if (!HeapTupleIsValid(reltup2))
1511 0 : elog(ERROR, "cache lookup failed for relation %u", r2);
1512 1685 : relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1513 :
1514 1685 : relfilenumber1 = relform1->relfilenode;
1515 1685 : relfilenumber2 = relform2->relfilenode;
1516 1685 : relam1 = relform1->relam;
1517 1685 : relam2 = relform2->relam;
1518 :
1519 1685 : if (RelFileNumberIsValid(relfilenumber1) &&
1520 : RelFileNumberIsValid(relfilenumber2))
1521 : {
1522 : /*
1523 : * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1524 : * relpersistence
1525 : */
1526 : Assert(!target_is_pg_class);
1527 :
1528 1596 : swaptemp = relform1->relfilenode;
1529 1596 : relform1->relfilenode = relform2->relfilenode;
1530 1596 : relform2->relfilenode = swaptemp;
1531 :
1532 1596 : swaptemp = relform1->reltablespace;
1533 1596 : relform1->reltablespace = relform2->reltablespace;
1534 1596 : relform2->reltablespace = swaptemp;
1535 :
1536 1596 : swaptemp = relform1->relam;
1537 1596 : relform1->relam = relform2->relam;
1538 1596 : relform2->relam = swaptemp;
1539 :
1540 1596 : swptmpchr = relform1->relpersistence;
1541 1596 : relform1->relpersistence = relform2->relpersistence;
1542 1596 : relform2->relpersistence = swptmpchr;
1543 :
1544 : /* Also swap toast links, if we're swapping by links */
1545 1596 : if (!swap_toast_by_content)
1546 : {
1547 1284 : swaptemp = relform1->reltoastrelid;
1548 1284 : relform1->reltoastrelid = relform2->reltoastrelid;
1549 1284 : relform2->reltoastrelid = swaptemp;
1550 : }
1551 : }
1552 : else
1553 : {
1554 : /*
1555 : * Mapped-relation case. Here we have to swap the relation mappings
1556 : * instead of modifying the pg_class columns. Both must be mapped.
1557 : */
1558 89 : if (RelFileNumberIsValid(relfilenumber1) ||
1559 : RelFileNumberIsValid(relfilenumber2))
1560 0 : elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1561 : NameStr(relform1->relname));
1562 :
1563 : /*
1564 : * We can't change the tablespace nor persistence of a mapped rel, and
1565 : * we can't handle toast link swapping for one either, because we must
1566 : * not apply any critical changes to its pg_class row. These cases
1567 : * should be prevented by upstream permissions tests, so these checks
1568 : * are non-user-facing emergency backstop.
1569 : */
1570 89 : if (relform1->reltablespace != relform2->reltablespace)
1571 0 : elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1572 : NameStr(relform1->relname));
1573 89 : if (relform1->relpersistence != relform2->relpersistence)
1574 0 : elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1575 : NameStr(relform1->relname));
1576 89 : if (relform1->relam != relform2->relam)
1577 0 : elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1578 : NameStr(relform1->relname));
1579 89 : if (!swap_toast_by_content &&
1580 29 : (relform1->reltoastrelid || relform2->reltoastrelid))
1581 0 : elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1582 : NameStr(relform1->relname));
1583 :
1584 : /*
1585 : * Fetch the mappings --- shouldn't fail, but be paranoid
1586 : */
1587 89 : relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
1588 89 : if (!RelFileNumberIsValid(relfilenumber1))
1589 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1590 : NameStr(relform1->relname), r1);
1591 89 : relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
1592 89 : if (!RelFileNumberIsValid(relfilenumber2))
1593 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1594 : NameStr(relform2->relname), r2);
1595 :
1596 : /*
1597 : * Send replacement mappings to relmapper. Note these won't actually
1598 : * take effect until CommandCounterIncrement.
1599 : */
1600 89 : RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1601 89 : RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1602 :
1603 : /* Pass OIDs of mapped r2 tables back to caller */
1604 89 : *mapped_tables++ = r2;
1605 : }
1606 :
1607 : /*
1608 : * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1609 : * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1610 : * new.
1611 : */
1612 : {
1613 : Relation rel1,
1614 : rel2;
1615 :
1616 1685 : rel1 = relation_open(r1, NoLock);
1617 1685 : rel2 = relation_open(r2, NoLock);
1618 1685 : rel2->rd_createSubid = rel1->rd_createSubid;
1619 1685 : rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1620 1685 : rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1621 1685 : RelationAssumeNewRelfilelocator(rel1);
1622 1685 : relation_close(rel1, NoLock);
1623 1685 : relation_close(rel2, NoLock);
1624 : }
1625 :
1626 : /*
1627 : * In the case of a shared catalog, these next few steps will only affect
1628 : * our own database's pg_class row; but that's okay, because they are all
1629 : * noncritical updates. That's also an important fact for the case of a
1630 : * mapped catalog, because it's possible that we'll commit the map change
1631 : * and then fail to commit the pg_class update.
1632 : */
1633 :
1634 : /* set rel1's frozen Xid and minimum MultiXid */
1635 1685 : if (relform1->relkind != RELKIND_INDEX)
1636 : {
1637 : Assert(!TransactionIdIsValid(frozenXid) ||
1638 : TransactionIdIsNormal(frozenXid));
1639 1557 : relform1->relfrozenxid = frozenXid;
1640 1557 : relform1->relminmxid = cutoffMulti;
1641 : }
1642 :
1643 : /* swap size statistics too, since new rel has freshly-updated stats */
1644 : {
1645 : int32 swap_pages;
1646 : float4 swap_tuples;
1647 : int32 swap_allvisible;
1648 : int32 swap_allfrozen;
1649 :
1650 1685 : swap_pages = relform1->relpages;
1651 1685 : relform1->relpages = relform2->relpages;
1652 1685 : relform2->relpages = swap_pages;
1653 :
1654 1685 : swap_tuples = relform1->reltuples;
1655 1685 : relform1->reltuples = relform2->reltuples;
1656 1685 : relform2->reltuples = swap_tuples;
1657 :
1658 1685 : swap_allvisible = relform1->relallvisible;
1659 1685 : relform1->relallvisible = relform2->relallvisible;
1660 1685 : relform2->relallvisible = swap_allvisible;
1661 :
1662 1685 : swap_allfrozen = relform1->relallfrozen;
1663 1685 : relform1->relallfrozen = relform2->relallfrozen;
1664 1685 : relform2->relallfrozen = swap_allfrozen;
1665 : }
1666 :
1667 : /*
1668 : * Update the tuples in pg_class --- unless the target relation of the
1669 : * swap is pg_class itself. In that case, there is zero point in making
1670 : * changes because we'd be updating the old data that we're about to throw
1671 : * away. Because the real work being done here for a mapped relation is
1672 : * just to change the relation map settings, it's all right to not update
1673 : * the pg_class rows in this case. The most important changes will instead
1674 : * performed later, in finish_heap_swap() itself.
1675 : */
1676 1685 : if (!target_is_pg_class)
1677 : {
1678 : CatalogIndexState indstate;
1679 :
1680 1662 : indstate = CatalogOpenIndexes(relRelation);
1681 1662 : CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
1682 : indstate);
1683 1662 : CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
1684 : indstate);
1685 1662 : CatalogCloseIndexes(indstate);
1686 : }
1687 : else
1688 : {
1689 : /* no update ... but we do still need relcache inval */
1690 23 : CacheInvalidateRelcacheByTuple(reltup1);
1691 23 : CacheInvalidateRelcacheByTuple(reltup2);
1692 : }
1693 :
1694 : /*
1695 : * Now that pg_class has been updated with its relevant information for
1696 : * the swap, update the dependency of the relations to point to their new
1697 : * table AM, if it has changed.
1698 : */
1699 1685 : if (relam1 != relam2)
1700 : {
1701 24 : if (changeDependencyFor(RelationRelationId,
1702 : r1,
1703 : AccessMethodRelationId,
1704 : relam1,
1705 : relam2) != 1)
1706 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1707 : get_namespace_name(get_rel_namespace(r1)),
1708 : get_rel_name(r1));
1709 24 : if (changeDependencyFor(RelationRelationId,
1710 : r2,
1711 : AccessMethodRelationId,
1712 : relam2,
1713 : relam1) != 1)
1714 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1715 : get_namespace_name(get_rel_namespace(r2)),
1716 : get_rel_name(r2));
1717 : }
1718 :
1719 : /*
1720 : * Post alter hook for modified relations. The change to r2 is always
1721 : * internal, but r1 depends on the invocation context.
1722 : */
1723 1685 : InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
1724 : InvalidOid, is_internal);
1725 1685 : InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
1726 : InvalidOid, true);
1727 :
1728 : /*
1729 : * If we have toast tables associated with the relations being swapped,
1730 : * deal with them too.
1731 : */
1732 1685 : if (relform1->reltoastrelid || relform2->reltoastrelid)
1733 : {
1734 524 : if (swap_toast_by_content)
1735 : {
1736 124 : if (relform1->reltoastrelid && relform2->reltoastrelid)
1737 : {
1738 : /* Recursively swap the contents of the toast tables */
1739 124 : swap_relation_files(relform1->reltoastrelid,
1740 : relform2->reltoastrelid,
1741 : target_is_pg_class,
1742 : swap_toast_by_content,
1743 : is_internal,
1744 : frozenXid,
1745 : cutoffMulti,
1746 : mapped_tables);
1747 : }
1748 : else
1749 : {
1750 : /* caller messed up */
1751 0 : elog(ERROR, "cannot swap toast files by content when there's only one");
1752 : }
1753 : }
1754 : else
1755 : {
1756 : /*
1757 : * We swapped the ownership links, so we need to change dependency
1758 : * data to match.
1759 : *
1760 : * NOTE: it is possible that only one table has a toast table.
1761 : *
1762 : * NOTE: at present, a TOAST table's only dependency is the one on
1763 : * its owning table. If more are ever created, we'd need to use
1764 : * something more selective than deleteDependencyRecordsFor() to
1765 : * get rid of just the link we want.
1766 : */
1767 : ObjectAddress baseobject,
1768 : toastobject;
1769 : long count;
1770 :
1771 : /*
1772 : * We disallow this case for system catalogs, to avoid the
1773 : * possibility that the catalog we're rebuilding is one of the
1774 : * ones the dependency changes would change. It's too late to be
1775 : * making any data changes to the target catalog.
1776 : */
1777 400 : if (IsSystemClass(r1, relform1))
1778 0 : elog(ERROR, "cannot swap toast files by links for system catalogs");
1779 :
1780 : /* Delete old dependencies */
1781 400 : if (relform1->reltoastrelid)
1782 : {
1783 379 : count = deleteDependencyRecordsFor(RelationRelationId,
1784 : relform1->reltoastrelid,
1785 : false);
1786 379 : if (count != 1)
1787 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1788 : count);
1789 : }
1790 400 : if (relform2->reltoastrelid)
1791 : {
1792 400 : count = deleteDependencyRecordsFor(RelationRelationId,
1793 : relform2->reltoastrelid,
1794 : false);
1795 400 : if (count != 1)
1796 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1797 : count);
1798 : }
1799 :
1800 : /* Register new dependencies */
1801 400 : baseobject.classId = RelationRelationId;
1802 400 : baseobject.objectSubId = 0;
1803 400 : toastobject.classId = RelationRelationId;
1804 400 : toastobject.objectSubId = 0;
1805 :
1806 400 : if (relform1->reltoastrelid)
1807 : {
1808 379 : baseobject.objectId = r1;
1809 379 : toastobject.objectId = relform1->reltoastrelid;
1810 379 : recordDependencyOn(&toastobject, &baseobject,
1811 : DEPENDENCY_INTERNAL);
1812 : }
1813 :
1814 400 : if (relform2->reltoastrelid)
1815 : {
1816 400 : baseobject.objectId = r2;
1817 400 : toastobject.objectId = relform2->reltoastrelid;
1818 400 : recordDependencyOn(&toastobject, &baseobject,
1819 : DEPENDENCY_INTERNAL);
1820 : }
1821 : }
1822 : }
1823 :
1824 : /*
1825 : * If we're swapping two toast tables by content, do the same for their
1826 : * valid index. The swap can actually be safely done only if the relations
1827 : * have indexes.
1828 : */
1829 1685 : if (swap_toast_by_content &&
1830 372 : relform1->relkind == RELKIND_TOASTVALUE &&
1831 124 : relform2->relkind == RELKIND_TOASTVALUE)
1832 : {
1833 : Oid toastIndex1,
1834 : toastIndex2;
1835 :
1836 : /* Get valid index for each relation */
1837 124 : toastIndex1 = toast_get_valid_index(r1,
1838 : AccessExclusiveLock);
1839 124 : toastIndex2 = toast_get_valid_index(r2,
1840 : AccessExclusiveLock);
1841 :
1842 124 : swap_relation_files(toastIndex1,
1843 : toastIndex2,
1844 : target_is_pg_class,
1845 : swap_toast_by_content,
1846 : is_internal,
1847 : InvalidTransactionId,
1848 : InvalidMultiXactId,
1849 : mapped_tables);
1850 : }
1851 :
1852 : /* Clean up. */
1853 1685 : heap_freetuple(reltup1);
1854 1685 : heap_freetuple(reltup2);
1855 :
1856 1685 : table_close(relRelation, RowExclusiveLock);
1857 1685 : }
1858 :
1859 : /*
1860 : * Remove the transient table that was built by make_new_heap, and finish
1861 : * cleaning up (including rebuilding all indexes on the old heap).
1862 : */
1863 : void
1864 1433 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1865 : bool is_system_catalog,
1866 : bool swap_toast_by_content,
1867 : bool check_constraints,
1868 : bool is_internal,
1869 : bool reindex,
1870 : TransactionId frozenXid,
1871 : MultiXactId cutoffMulti,
1872 : char newrelpersistence)
1873 : {
1874 : ObjectAddress object;
1875 : Oid mapped_tables[4];
1876 : int i;
1877 :
1878 : /* Report that we are now swapping relation files */
1879 1433 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1880 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
1881 :
1882 : /* Zero out possible results from swapped_relation_files */
1883 1433 : memset(mapped_tables, 0, sizeof(mapped_tables));
1884 :
1885 : /*
1886 : * Swap the contents of the heap relations (including any toast tables).
1887 : * Also set old heap's relfrozenxid to frozenXid.
1888 : */
1889 1433 : swap_relation_files(OIDOldHeap, OIDNewHeap,
1890 : (OIDOldHeap == RelationRelationId),
1891 : swap_toast_by_content, is_internal,
1892 : frozenXid, cutoffMulti, mapped_tables);
1893 :
1894 : /*
1895 : * If it's a system catalog, queue a sinval message to flush all catcaches
1896 : * on the catalog when we reach CommandCounterIncrement.
1897 : */
1898 1433 : if (is_system_catalog)
1899 121 : CacheInvalidateCatalog(OIDOldHeap);
1900 :
1901 1433 : if (reindex)
1902 : {
1903 : int reindex_flags;
1904 1430 : ReindexParams reindex_params = {0};
1905 :
1906 : /*
1907 : * Rebuild each index on the relation (but not the toast table, which
1908 : * is all-new at this point). It is important to do this before the
1909 : * DROP step because if we are processing a system catalog that will
1910 : * be used during DROP, we want to have its indexes available. There
1911 : * is no advantage to the other order anyway because this is all
1912 : * transactional, so no chance to reclaim disk space before commit. We
1913 : * do not need a final CommandCounterIncrement() because
1914 : * reindex_relation does it.
1915 : *
1916 : * Note: because index_build is called via reindex_relation, it will
1917 : * never set indcheckxmin true for the indexes. This is OK even
1918 : * though in some sense we are building new indexes rather than
1919 : * rebuilding existing ones, because the new heap won't contain any
1920 : * HOT chains at all, let alone broken ones, so it can't be necessary
1921 : * to set indcheckxmin.
1922 : */
1923 1430 : reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1924 1430 : if (check_constraints)
1925 1039 : reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1926 :
1927 : /*
1928 : * Ensure that the indexes have the same persistence as the parent
1929 : * relation.
1930 : */
1931 1430 : if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1932 25 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
1933 1405 : else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1934 1352 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
1935 :
1936 : /* Report that we are now reindexing relations */
1937 1430 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1938 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
1939 :
1940 1430 : reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
1941 : }
1942 :
1943 : /* Report that we are now doing clean up */
1944 1421 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1945 : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1946 :
1947 : /*
1948 : * If the relation being rebuilt is pg_class, swap_relation_files()
1949 : * couldn't update pg_class's own pg_class entry (check comments in
1950 : * swap_relation_files()), thus relfrozenxid was not updated. That's
1951 : * annoying because a potential reason for doing a VACUUM FULL is a
1952 : * imminent or actual anti-wraparound shutdown. So, now that we can
1953 : * access the new relation using its indices, update relfrozenxid.
1954 : * pg_class doesn't have a toast relation, so we don't need to update the
1955 : * corresponding toast relation. Not that there's little point moving all
1956 : * relfrozenxid updates here since swap_relation_files() needs to write to
1957 : * pg_class for non-mapped relations anyway.
1958 : */
1959 1421 : if (OIDOldHeap == RelationRelationId)
1960 : {
1961 : Relation relRelation;
1962 : HeapTuple reltup;
1963 : Form_pg_class relform;
1964 :
1965 23 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1966 :
1967 23 : reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1968 23 : if (!HeapTupleIsValid(reltup))
1969 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1970 23 : relform = (Form_pg_class) GETSTRUCT(reltup);
1971 :
1972 23 : relform->relfrozenxid = frozenXid;
1973 23 : relform->relminmxid = cutoffMulti;
1974 :
1975 23 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1976 :
1977 23 : table_close(relRelation, RowExclusiveLock);
1978 : }
1979 :
1980 : /* Destroy new heap with old filenumber */
1981 1421 : object.classId = RelationRelationId;
1982 1421 : object.objectId = OIDNewHeap;
1983 1421 : object.objectSubId = 0;
1984 :
1985 1421 : if (!reindex)
1986 : {
1987 : /*
1988 : * Make sure the changes in pg_class are visible. This is especially
1989 : * important if !swap_toast_by_content, so that the correct TOAST
1990 : * relation is dropped. (reindex_relation() above did not help in this
1991 : * case))
1992 : */
1993 3 : CommandCounterIncrement();
1994 : }
1995 :
1996 : /*
1997 : * The new relation is local to our transaction and we know nothing
1998 : * depends on it, so DROP_RESTRICT should be OK.
1999 : */
2000 1421 : performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
2001 :
2002 : /* performDeletion does CommandCounterIncrement at end */
2003 :
2004 : /*
2005 : * Now we must remove any relation mapping entries that we set up for the
2006 : * transient table, as well as its toast table and toast index if any. If
2007 : * we fail to do this before commit, the relmapper will complain about new
2008 : * permanent map entries being added post-bootstrap.
2009 : */
2010 1510 : for (i = 0; OidIsValid(mapped_tables[i]); i++)
2011 89 : RelationMapRemoveMapping(mapped_tables[i]);
2012 :
2013 : /*
2014 : * At this point, everything is kosher except that, if we did toast swap
2015 : * by links, the toast table's name corresponds to the transient table.
2016 : * The name is irrelevant to the backend because it's referenced by OID,
2017 : * but users looking at the catalogs could be confused. Rename it to
2018 : * prevent this problem.
2019 : *
2020 : * Note no lock required on the relation, because we already hold an
2021 : * exclusive lock on it.
2022 : */
2023 1421 : if (!swap_toast_by_content)
2024 : {
2025 : Relation newrel;
2026 :
2027 1297 : newrel = table_open(OIDOldHeap, NoLock);
2028 1297 : if (OidIsValid(newrel->rd_rel->reltoastrelid))
2029 : {
2030 : Oid toastidx;
2031 : char NewToastName[NAMEDATALEN];
2032 :
2033 : /* Get the associated valid index to be renamed */
2034 379 : toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2035 : AccessExclusiveLock);
2036 :
2037 : /* rename the toast table ... */
2038 379 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2039 : OIDOldHeap);
2040 379 : RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2041 : NewToastName, true, false);
2042 :
2043 : /* ... and its valid index too. */
2044 379 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2045 : OIDOldHeap);
2046 :
2047 379 : RenameRelationInternal(toastidx,
2048 : NewToastName, true, true);
2049 :
2050 : /*
2051 : * Reset the relrewrite for the toast. The command-counter
2052 : * increment is required here as we are about to update the tuple
2053 : * that is updated as part of RenameRelationInternal.
2054 : */
2055 379 : CommandCounterIncrement();
2056 379 : ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2057 : }
2058 1297 : relation_close(newrel, NoLock);
2059 : }
2060 :
2061 : /* if it's not a catalog table, clear any missing attribute settings */
2062 1421 : if (!is_system_catalog)
2063 : {
2064 : Relation newrel;
2065 :
2066 1300 : newrel = table_open(OIDOldHeap, NoLock);
2067 1300 : RelationClearMissing(newrel);
2068 1300 : relation_close(newrel, NoLock);
2069 : }
2070 1421 : }
2071 :
2072 : /*
2073 : * Determine which relations to process, when REPACK/CLUSTER is called
2074 : * without specifying a table name. The exact process depends on whether
2075 : * USING INDEX was given or not, and in any case we only return tables and
2076 : * materialized views that the current user has privileges to repack/cluster.
2077 : *
2078 : * If USING INDEX was given, we scan pg_index to find those that have
2079 : * indisclustered set; if it was not given, scan pg_class and return all
2080 : * tables.
2081 : *
2082 : * Return it as a list of RelToCluster in the given memory context.
2083 : */
2084 : static List *
2085 15 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
2086 : {
2087 : Relation catalog;
2088 : TableScanDesc scan;
2089 : HeapTuple tuple;
2090 15 : List *rtcs = NIL;
2091 :
2092 15 : if (usingindex)
2093 : {
2094 : ScanKeyData entry;
2095 :
2096 : /*
2097 : * For USING INDEX, scan pg_index to find those with indisclustered.
2098 : */
2099 11 : catalog = table_open(IndexRelationId, AccessShareLock);
2100 11 : ScanKeyInit(&entry,
2101 : Anum_pg_index_indisclustered,
2102 : BTEqualStrategyNumber, F_BOOLEQ,
2103 : BoolGetDatum(true));
2104 11 : scan = table_beginscan_catalog(catalog, 1, &entry);
2105 23 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2106 : {
2107 : RelToCluster *rtc;
2108 : Form_pg_index index;
2109 : MemoryContext oldcxt;
2110 :
2111 12 : index = (Form_pg_index) GETSTRUCT(tuple);
2112 :
2113 : /*
2114 : * Try to obtain a light lock on the index's table, to ensure it
2115 : * doesn't go away while we collect the list. If we cannot, just
2116 : * disregard it. Be sure to release this if we ultimately decide
2117 : * not to process the table!
2118 : */
2119 12 : if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
2120 0 : continue;
2121 :
2122 : /* Verify that the table still exists; skip if not */
2123 12 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(index->indrelid)))
2124 : {
2125 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2126 0 : continue;
2127 : }
2128 :
2129 : /* noisily skip rels which the user can't process */
2130 12 : if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2131 : GetUserId()))
2132 : {
2133 8 : UnlockRelationOid(index->indrelid, AccessShareLock);
2134 8 : continue;
2135 : }
2136 :
2137 : /* Use a permanent memory context for the result list */
2138 4 : oldcxt = MemoryContextSwitchTo(permcxt);
2139 4 : rtc = palloc_object(RelToCluster);
2140 4 : rtc->tableOid = index->indrelid;
2141 4 : rtc->indexOid = index->indexrelid;
2142 4 : rtcs = lappend(rtcs, rtc);
2143 4 : MemoryContextSwitchTo(oldcxt);
2144 : }
2145 : }
2146 : else
2147 : {
2148 4 : catalog = table_open(RelationRelationId, AccessShareLock);
2149 4 : scan = table_beginscan_catalog(catalog, 0, NULL);
2150 :
2151 9164 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2152 : {
2153 : RelToCluster *rtc;
2154 : Form_pg_class class;
2155 : MemoryContext oldcxt;
2156 :
2157 9160 : class = (Form_pg_class) GETSTRUCT(tuple);
2158 :
2159 : /*
2160 : * Try to obtain a light lock on the table, to ensure it doesn't
2161 : * go away while we collect the list. If we cannot, just
2162 : * disregard the table. Be sure to release this if we ultimately
2163 : * decide not to process the table!
2164 : */
2165 9160 : if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
2166 0 : continue;
2167 :
2168 : /* Verify that the table still exists */
2169 9160 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
2170 : {
2171 0 : UnlockRelationOid(class->oid, AccessShareLock);
2172 0 : continue;
2173 : }
2174 :
2175 : /* Can only process plain tables and matviews */
2176 9160 : if (class->relkind != RELKIND_RELATION &&
2177 6068 : class->relkind != RELKIND_MATVIEW)
2178 : {
2179 6036 : UnlockRelationOid(class->oid, AccessShareLock);
2180 6036 : continue;
2181 : }
2182 :
2183 : /* noisily skip rels which the user can't process */
2184 3124 : if (!repack_is_permitted_for_relation(cmd, class->oid,
2185 : GetUserId()))
2186 : {
2187 3116 : UnlockRelationOid(class->oid, AccessShareLock);
2188 3116 : continue;
2189 : }
2190 :
2191 : /* Use a permanent memory context for the result list */
2192 8 : oldcxt = MemoryContextSwitchTo(permcxt);
2193 8 : rtc = palloc_object(RelToCluster);
2194 8 : rtc->tableOid = class->oid;
2195 8 : rtc->indexOid = InvalidOid;
2196 8 : rtcs = lappend(rtcs, rtc);
2197 8 : MemoryContextSwitchTo(oldcxt);
2198 : }
2199 : }
2200 :
2201 15 : table_endscan(scan);
2202 15 : relation_close(catalog, AccessShareLock);
2203 :
2204 15 : return rtcs;
2205 : }
2206 :
2207 : /*
2208 : * Given a partitioned table or its index, return a list of RelToCluster for
2209 : * all the leaf child tables/indexes.
2210 : *
2211 : * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2212 : * owning relation.
2213 : */
2214 : static List *
2215 20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
2216 : bool rel_is_index, MemoryContext permcxt)
2217 : {
2218 : List *inhoids;
2219 20 : List *rtcs = NIL;
2220 :
2221 : /*
2222 : * Do not lock the children until they're processed. Note that we do hold
2223 : * a lock on the parent partitioned table.
2224 : */
2225 20 : inhoids = find_all_inheritors(relid, NoLock, NULL);
2226 148 : foreach_oid(child_oid, inhoids)
2227 : {
2228 : Oid table_oid,
2229 : index_oid;
2230 : RelToCluster *rtc;
2231 : MemoryContext oldcxt;
2232 :
2233 108 : if (rel_is_index)
2234 : {
2235 : /* consider only leaf indexes */
2236 80 : if (get_rel_relkind(child_oid) != RELKIND_INDEX)
2237 40 : continue;
2238 :
2239 40 : table_oid = IndexGetRelation(child_oid, false);
2240 40 : index_oid = child_oid;
2241 : }
2242 : else
2243 : {
2244 : /* consider only leaf relations */
2245 28 : if (get_rel_relkind(child_oid) != RELKIND_RELATION)
2246 16 : continue;
2247 :
2248 12 : table_oid = child_oid;
2249 12 : index_oid = InvalidOid;
2250 : }
2251 :
2252 : /*
2253 : * It's possible that the user does not have privileges to CLUSTER the
2254 : * leaf partition despite having them on the partitioned table. Skip
2255 : * if so.
2256 : */
2257 52 : if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
2258 12 : continue;
2259 :
2260 : /* Use a permanent memory context for the result list */
2261 40 : oldcxt = MemoryContextSwitchTo(permcxt);
2262 40 : rtc = palloc_object(RelToCluster);
2263 40 : rtc->tableOid = table_oid;
2264 40 : rtc->indexOid = index_oid;
2265 40 : rtcs = lappend(rtcs, rtc);
2266 40 : MemoryContextSwitchTo(oldcxt);
2267 : }
2268 :
2269 20 : return rtcs;
2270 : }
2271 :
2272 :
2273 : /*
2274 : * Return whether userid has privileges to REPACK relid. If not, this
2275 : * function emits a WARNING.
2276 : */
2277 : static bool
2278 3240 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
2279 : {
2280 : Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
2281 :
2282 3240 : if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2283 104 : return true;
2284 :
2285 3136 : ereport(WARNING,
2286 : errmsg("permission denied to execute %s on \"%s\", skipping it",
2287 : RepackCommandAsString(cmd),
2288 : get_rel_name(relid)));
2289 :
2290 3136 : return false;
2291 : }
2292 :
2293 :
2294 : /*
2295 : * Given a RepackStmt with an indicated relation name, resolve the relation
2296 : * name, obtain lock on it, then determine what to do based on the relation
2297 : * type: if it's table and not partitioned, repack it as indicated (using an
2298 : * existing clustered index, or following the given one), and return NULL.
2299 : *
2300 : * On the other hand, if the table is partitioned, do nothing further and
2301 : * instead return the opened and locked relcache entry, so that caller can
2302 : * process the partitions using the multiple-table handling code. In this
2303 : * case, if an index name is given, it's up to the caller to resolve it.
2304 : */
2305 : static Relation
2306 204 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
2307 : ClusterParams *params)
2308 : {
2309 : Relation rel;
2310 : Oid tableOid;
2311 :
2312 : Assert(stmt->relation != NULL);
2313 : Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2314 : stmt->command == REPACK_COMMAND_REPACK);
2315 :
2316 : /*
2317 : * Make sure ANALYZE is specified if a column list is present.
2318 : */
2319 204 : if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2320 4 : ereport(ERROR,
2321 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2322 : errmsg("ANALYZE option must be specified when a column list is provided"));
2323 :
2324 : /* Find, lock, and check permissions on the table. */
2325 200 : tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2326 : lockmode,
2327 : 0,
2328 : RangeVarCallbackMaintainsTable,
2329 : NULL);
2330 192 : rel = table_open(tableOid, NoLock);
2331 :
2332 : /*
2333 : * Reject clustering a remote temp table ... their local buffer manager is
2334 : * not going to cope.
2335 : */
2336 192 : if (RELATION_IS_OTHER_TEMP(rel))
2337 0 : ereport(ERROR,
2338 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2339 : /*- translator: first %s is name of a SQL command, eg. REPACK */
2340 : errmsg("cannot execute %s on temporary tables of other sessions",
2341 : RepackCommandAsString(stmt->command)));
2342 :
2343 : /*
2344 : * For partitioned tables, let caller handle this. Otherwise, process it
2345 : * here and we're done.
2346 : */
2347 192 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2348 36 : return rel;
2349 : else
2350 : {
2351 156 : Oid indexOid = InvalidOid;
2352 :
2353 156 : indexOid = determine_clustered_index(rel, stmt->usingindex,
2354 156 : stmt->indexname);
2355 152 : if (OidIsValid(indexOid))
2356 110 : check_index_is_clusterable(rel, indexOid, lockmode);
2357 :
2358 152 : cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2359 :
2360 : /*
2361 : * Do an analyze, if requested. We close the transaction and start a
2362 : * new one, so that we don't hold the stronger lock for longer than
2363 : * needed.
2364 : */
2365 120 : if (params->options & CLUOPT_ANALYZE)
2366 : {
2367 8 : VacuumParams vac_params = {0};
2368 :
2369 8 : PopActiveSnapshot();
2370 8 : CommitTransactionCommand();
2371 :
2372 8 : StartTransactionCommand();
2373 8 : PushActiveSnapshot(GetTransactionSnapshot());
2374 :
2375 8 : vac_params.options |= VACOPT_ANALYZE;
2376 8 : if (params->options & CLUOPT_VERBOSE)
2377 0 : vac_params.options |= VACOPT_VERBOSE;
2378 8 : analyze_rel(tableOid, NULL, &vac_params,
2379 8 : stmt->relation->va_cols, true, NULL);
2380 8 : PopActiveSnapshot();
2381 8 : CommandCounterIncrement();
2382 : }
2383 :
2384 120 : return NULL;
2385 : }
2386 : }
2387 :
2388 : /*
2389 : * Given a relation and the usingindex/indexname options in a
2390 : * REPACK USING INDEX or CLUSTER command, return the OID of the
2391 : * index to use for clustering the table.
2392 : *
2393 : * Caller must hold lock on the relation so that the set of indexes
2394 : * doesn't change, and must call check_index_is_clusterable.
2395 : */
2396 : static Oid
2397 176 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2398 : {
2399 : Oid indexOid;
2400 :
2401 176 : if (indexname == NULL && usingindex)
2402 : {
2403 : /*
2404 : * If USING INDEX with no name is given, find a clustered index, or
2405 : * error out if none.
2406 : */
2407 21 : indexOid = InvalidOid;
2408 46 : foreach_oid(idxoid, RelationGetIndexList(rel))
2409 : {
2410 21 : if (get_index_isclustered(idxoid))
2411 : {
2412 17 : indexOid = idxoid;
2413 17 : break;
2414 : }
2415 : }
2416 :
2417 21 : if (!OidIsValid(indexOid))
2418 4 : ereport(ERROR,
2419 : errcode(ERRCODE_UNDEFINED_OBJECT),
2420 : errmsg("there is no previously clustered index for table \"%s\"",
2421 : RelationGetRelationName(rel)));
2422 : }
2423 155 : else if (indexname != NULL)
2424 : {
2425 : /* An index was specified; obtain its OID. */
2426 113 : indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2427 113 : if (!OidIsValid(indexOid))
2428 0 : ereport(ERROR,
2429 : errcode(ERRCODE_UNDEFINED_OBJECT),
2430 : errmsg("index \"%s\" for table \"%s\" does not exist",
2431 : indexname, RelationGetRelationName(rel)));
2432 : }
2433 : else
2434 42 : indexOid = InvalidOid;
2435 :
2436 172 : return indexOid;
2437 : }
2438 :
2439 : static const char *
2440 3581 : RepackCommandAsString(RepackCommand cmd)
2441 : {
2442 3581 : switch (cmd)
2443 : {
2444 3183 : case REPACK_COMMAND_REPACK:
2445 3183 : return "REPACK";
2446 222 : case REPACK_COMMAND_VACUUMFULL:
2447 222 : return "VACUUM";
2448 176 : case REPACK_COMMAND_CLUSTER:
2449 176 : return "CLUSTER";
2450 : }
2451 0 : return "???"; /* keep compiler quiet */
2452 : }
2453 :
2454 : /*
2455 : * Apply all the changes stored in 'file'.
2456 : */
2457 : static void
2458 6 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
2459 : {
2460 6 : ConcurrentChangeKind kind = '\0';
2461 6 : Relation rel = chgcxt->cc_rel;
2462 : TupleTableSlot *spilled_tuple;
2463 : TupleTableSlot *old_update_tuple;
2464 : TupleTableSlot *ondisk_tuple;
2465 6 : bool have_old_tuple = false;
2466 : MemoryContext oldcxt;
2467 :
2468 6 : spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2469 : &TTSOpsVirtual);
2470 6 : ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2471 : table_slot_callbacks(rel));
2472 6 : old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2473 : &TTSOpsVirtual);
2474 :
2475 6 : oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
2476 :
2477 : while (true)
2478 38 : {
2479 : size_t nread;
2480 44 : ConcurrentChangeKind prevkind = kind;
2481 :
2482 44 : CHECK_FOR_INTERRUPTS();
2483 :
2484 44 : nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2485 44 : if (nread == 0) /* done with the file? */
2486 6 : break;
2487 :
2488 : /*
2489 : * If this is the old tuple for an update, read it into the tuple slot
2490 : * and go to the next one. The update itself will be executed on the
2491 : * next iteration, when we receive the NEW tuple.
2492 : */
2493 38 : if (kind == CHANGE_UPDATE_OLD)
2494 : {
2495 8 : restore_tuple(file, rel, old_update_tuple);
2496 8 : have_old_tuple = true;
2497 8 : continue;
2498 : }
2499 :
2500 : /*
2501 : * Just before an UPDATE or DELETE, we must update the command
2502 : * counter, because the change could refer to a tuple that we have
2503 : * just inserted; and before an INSERT, we have to do this also if the
2504 : * previous command was either update or delete.
2505 : *
2506 : * With this approach we don't spend so many CCIs for long strings of
2507 : * only INSERTs, which can't affect one another.
2508 : */
2509 30 : if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2510 7 : (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
2511 : prevkind == CHANGE_DELETE)))
2512 : {
2513 27 : CommandCounterIncrement();
2514 27 : UpdateActiveSnapshotCommandId();
2515 : }
2516 :
2517 : /*
2518 : * Now restore the tuple into the slot and execute the change.
2519 : */
2520 30 : restore_tuple(file, rel, spilled_tuple);
2521 :
2522 30 : if (kind == CHANGE_INSERT)
2523 : {
2524 7 : apply_concurrent_insert(rel, spilled_tuple, chgcxt);
2525 : }
2526 23 : else if (kind == CHANGE_DELETE)
2527 : {
2528 : bool found;
2529 :
2530 : /* Find the tuple to be deleted */
2531 3 : found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
2532 3 : if (!found)
2533 0 : elog(ERROR, "failed to find target tuple");
2534 3 : apply_concurrent_delete(rel, ondisk_tuple);
2535 : }
2536 20 : else if (kind == CHANGE_UPDATE_NEW)
2537 : {
2538 : TupleTableSlot *key;
2539 : bool found;
2540 :
2541 20 : if (have_old_tuple)
2542 8 : key = old_update_tuple;
2543 : else
2544 12 : key = spilled_tuple;
2545 :
2546 : /* Find the tuple to be updated or deleted. */
2547 20 : found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2548 20 : if (!found)
2549 0 : elog(ERROR, "failed to find target tuple");
2550 :
2551 : /*
2552 : * If 'tup' contains TOAST pointers, they point to the old
2553 : * relation's toast. Copy the corresponding TOAST pointers for the
2554 : * new relation from the existing tuple. (The fact that we
2555 : * received a TOAST pointer here implies that the attribute hasn't
2556 : * changed.)
2557 : */
2558 20 : adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
2559 :
2560 20 : apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
2561 :
2562 20 : ExecClearTuple(old_update_tuple);
2563 20 : have_old_tuple = false;
2564 : }
2565 : else
2566 0 : elog(ERROR, "unrecognized kind of change: %d", kind);
2567 :
2568 30 : ResetPerTupleExprContext(chgcxt->cc_estate);
2569 : }
2570 :
2571 : /* Cleanup. */
2572 6 : ExecDropSingleTupleTableSlot(spilled_tuple);
2573 6 : ExecDropSingleTupleTableSlot(ondisk_tuple);
2574 6 : ExecDropSingleTupleTableSlot(old_update_tuple);
2575 :
2576 6 : MemoryContextSwitchTo(oldcxt);
2577 6 : }
2578 :
2579 : /*
2580 : * Apply an insert from the spill of concurrent changes to the new copy of the
2581 : * table.
2582 : */
2583 : static void
2584 7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
2585 : ChangeContext *chgcxt)
2586 : {
2587 : /* Put the tuple in the table, but make sure it won't be decoded */
2588 7 : table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2589 : TABLE_INSERT_NO_LOGICAL, NULL);
2590 :
2591 : /* Update indexes with this new tuple. */
2592 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2593 : chgcxt->cc_estate,
2594 : 0,
2595 : slot,
2596 : NIL, NULL);
2597 7 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
2598 7 : }
2599 :
2600 : /*
2601 : * Apply an update from the spill of concurrent changes to the new copy of the
2602 : * table.
2603 : */
2604 : static void
2605 20 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
2606 : TupleTableSlot *ondisk_tuple,
2607 : ChangeContext *chgcxt)
2608 : {
2609 : LockTupleMode lockmode;
2610 : TM_FailureData tmfd;
2611 : TU_UpdateIndexes update_indexes;
2612 : TM_Result res;
2613 :
2614 : /*
2615 : * Carry out the update, skipping logical decoding for it.
2616 : */
2617 20 : res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2618 : GetCurrentCommandId(true),
2619 : TABLE_UPDATE_NO_LOGICAL,
2620 : InvalidSnapshot,
2621 : InvalidSnapshot,
2622 : false,
2623 : &tmfd, &lockmode, &update_indexes);
2624 20 : if (res != TM_Ok)
2625 0 : ereport(ERROR,
2626 : errmsg("failed to apply concurrent UPDATE"));
2627 :
2628 20 : if (update_indexes != TU_None)
2629 : {
2630 8 : uint32 flags = EIIT_IS_UPDATE;
2631 :
2632 8 : if (update_indexes == TU_Summarizing)
2633 0 : flags |= EIIT_ONLY_SUMMARIZING;
2634 8 : ExecInsertIndexTuples(chgcxt->cc_rri,
2635 : chgcxt->cc_estate,
2636 : flags,
2637 : spilled_tuple,
2638 : NIL, NULL);
2639 : }
2640 :
2641 20 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
2642 20 : }
2643 :
2644 : static void
2645 3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
2646 : {
2647 : TM_Result res;
2648 : TM_FailureData tmfd;
2649 :
2650 : /*
2651 : * Delete tuple from the new heap, skipping logical decoding for it.
2652 : */
2653 3 : res = table_tuple_delete(rel, &(slot->tts_tid),
2654 : GetCurrentCommandId(true),
2655 : TABLE_DELETE_NO_LOGICAL,
2656 : InvalidSnapshot, InvalidSnapshot,
2657 : false,
2658 : &tmfd);
2659 :
2660 3 : if (res != TM_Ok)
2661 0 : ereport(ERROR,
2662 : errmsg("failed to apply concurrent DELETE"));
2663 :
2664 3 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
2665 3 : }
2666 :
2667 : /*
2668 : * Read tuple from file and put it in the input slot. All memory is allocated
2669 : * in the current memory context; caller is responsible for freeing it as
2670 : * appropriate.
2671 : *
2672 : * External attributes are stored in separate memory chunks, in order to avoid
2673 : * exceeding MaxAllocSize - that could happen if the individual attributes are
2674 : * smaller than MaxAllocSize but the whole tuple is bigger.
2675 : */
2676 : static void
2677 38 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
2678 : {
2679 : uint32 t_len;
2680 : HeapTuple tup;
2681 : int natt_ext;
2682 :
2683 : /* Read the tuple. */
2684 38 : BufFileReadExact(file, &t_len, sizeof(t_len));
2685 38 : tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2686 38 : tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2687 38 : BufFileReadExact(file, tup->t_data, t_len);
2688 38 : tup->t_len = t_len;
2689 38 : ItemPointerSetInvalid(&tup->t_self);
2690 38 : tup->t_tableOid = RelationGetRelid(relation);
2691 :
2692 : /*
2693 : * Put the tuple we read in a slot. This deforms it, so that we can hack
2694 : * the external attributes in place.
2695 : */
2696 38 : ExecForceStoreHeapTuple(tup, slot, false);
2697 :
2698 : /*
2699 : * Next, read any attributes we stored separately into the tts_values
2700 : * array elements expecting them, if any. This matches
2701 : * repack_store_change.
2702 : */
2703 38 : BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2704 38 : if (natt_ext > 0)
2705 : {
2706 11 : TupleDesc desc = slot->tts_tupleDescriptor;
2707 :
2708 66 : for (int i = 0; i < desc->natts; i++)
2709 : {
2710 55 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2711 : varlena *varlen;
2712 : uint64 chunk_header;
2713 : void *value;
2714 : Size varlensz;
2715 :
2716 55 : if (attr->attisdropped || attr->attlen != -1)
2717 40 : continue;
2718 22 : if (slot_attisnull(slot, i + 1))
2719 0 : continue;
2720 22 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
2721 22 : if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
2722 7 : continue;
2723 15 : slot_getsomeattrs(slot, i + 1);
2724 :
2725 15 : BufFileReadExact(file, &chunk_header, VARHDRSZ);
2726 15 : varlensz = VARSIZE_ANY(&chunk_header);
2727 :
2728 15 : value = palloc(varlensz);
2729 15 : memcpy(value, &chunk_header, VARHDRSZ);
2730 15 : BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2731 :
2732 15 : slot->tts_values[i] = PointerGetDatum(value);
2733 15 : natt_ext--;
2734 15 : if (natt_ext < 0)
2735 0 : ereport(ERROR,
2736 : errcode(ERRCODE_DATA_CORRUPTED),
2737 : errmsg("insufficient number of attributes stored separately"));
2738 : }
2739 : }
2740 38 : }
2741 :
2742 : /*
2743 : * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2744 : * corresponding ones from 'src'.
2745 : */
2746 : static void
2747 20 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
2748 : {
2749 20 : TupleDesc desc = dest->tts_tupleDescriptor;
2750 :
2751 96 : for (int i = 0; i < desc->natts; i++)
2752 : {
2753 76 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2754 : varlena *varlena_dst;
2755 :
2756 76 : if (attr->attisdropped)
2757 24 : continue;
2758 52 : if (attr->attlen != -1)
2759 28 : continue;
2760 24 : if (slot_attisnull(dest, i + 1))
2761 0 : continue;
2762 :
2763 24 : slot_getsomeattrs(dest, i + 1);
2764 :
2765 24 : varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2766 24 : if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
2767 22 : continue;
2768 2 : slot_getsomeattrs(src, i + 1);
2769 :
2770 2 : dest->tts_values[i] = src->tts_values[i];
2771 : }
2772 20 : }
2773 :
2774 : /*
2775 : * Find the tuple to be updated or deleted by the given data change, whose
2776 : * tuple has already been loaded into locator.
2777 : *
2778 : * If the tuple is found, put it in retrieved and return true. If the tuple is
2779 : * not found, return false.
2780 : */
2781 : static bool
2782 23 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
2783 : TupleTableSlot *retrieved)
2784 : {
2785 23 : Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2786 : IndexScanDesc scan;
2787 : bool retval;
2788 :
2789 : /*
2790 : * Scan key is passed by caller, so it does not have to be constructed
2791 : * multiple times. Key entries have all fields initialized, except for
2792 : * sk_argument.
2793 : *
2794 : * Use the incoming tuple to finalize the scan key.
2795 : */
2796 46 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2797 : {
2798 23 : ScanKey entry = &chgcxt->cc_ident_key[i];
2799 23 : AttrNumber attno = idx->indkey.values[i];
2800 :
2801 23 : entry->sk_argument = locator->tts_values[attno - 1];
2802 : Assert(!locator->tts_isnull[attno - 1]);
2803 : }
2804 :
2805 : /* XXX no instrumentation for now */
2806 23 : scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2807 : NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2808 23 : index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2809 23 : retval = index_getnext_slot(scan, ForwardScanDirection, retrieved);
2810 23 : index_endscan(scan);
2811 :
2812 23 : return retval;
2813 : }
2814 :
2815 : /*
2816 : * Decode and apply concurrent changes, up to (and including) the record whose
2817 : * LSN is 'end_of_wal'.
2818 : *
2819 : * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2820 : * are far too similar to each other.
2821 : */
2822 : static void
2823 6 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
2824 : {
2825 : DecodingWorkerShared *shared;
2826 : char fname[MAXPGPATH];
2827 : BufFile *file;
2828 :
2829 6 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2830 : PROGRESS_REPACK_PHASE_CATCH_UP);
2831 :
2832 : /* Ask the worker for the file. */
2833 6 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
2834 6 : SpinLockAcquire(&shared->mutex);
2835 6 : shared->lsn_upto = end_of_wal;
2836 6 : shared->done = done;
2837 6 : SpinLockRelease(&shared->mutex);
2838 :
2839 : /*
2840 : * The worker needs to finish processing of the current WAL record. Even
2841 : * if it's idle, it'll need to close the output file. Thus we're likely to
2842 : * wait, so prepare for sleep.
2843 : */
2844 6 : ConditionVariablePrepareToSleep(&shared->cv);
2845 : for (;;)
2846 6 : {
2847 : int last_exported;
2848 :
2849 12 : SpinLockAcquire(&shared->mutex);
2850 12 : last_exported = shared->last_exported;
2851 12 : SpinLockRelease(&shared->mutex);
2852 :
2853 : /*
2854 : * Has the worker exported the file we are waiting for?
2855 : */
2856 12 : if (last_exported == chgcxt->cc_file_seq)
2857 6 : break;
2858 :
2859 6 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
2860 : }
2861 6 : ConditionVariableCancelSleep();
2862 :
2863 : /* Open the file. */
2864 6 : DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
2865 6 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
2866 6 : apply_concurrent_changes(file, chgcxt);
2867 :
2868 6 : BufFileClose(file);
2869 :
2870 : /* Get ready for the next file. */
2871 6 : chgcxt->cc_file_seq++;
2872 6 : }
2873 :
2874 : /*
2875 : * Initialize the ChangeContext struct for the given relation, with
2876 : * the given index as identity index.
2877 : */
2878 : static void
2879 3 : initialize_change_context(ChangeContext *chgcxt,
2880 : Relation relation, Oid ident_index_id)
2881 : {
2882 3 : chgcxt->cc_rel = relation;
2883 :
2884 : /* Only initialize fields needed by ExecInsertIndexTuples(). */
2885 3 : chgcxt->cc_estate = CreateExecutorState();
2886 :
2887 3 : chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
2888 3 : InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
2889 3 : ExecOpenIndices(chgcxt->cc_rri, false);
2890 :
2891 : /*
2892 : * The table's relcache entry already has the relcache entry for the
2893 : * identity index; find that.
2894 : */
2895 3 : chgcxt->cc_ident_index = NULL;
2896 3 : for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
2897 : {
2898 : Relation ind_rel;
2899 :
2900 3 : ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
2901 3 : if (ind_rel->rd_id == ident_index_id)
2902 : {
2903 3 : chgcxt->cc_ident_index = ind_rel;
2904 3 : break;
2905 : }
2906 : }
2907 3 : if (chgcxt->cc_ident_index == NULL)
2908 0 : elog(ERROR, "failed to find identity index");
2909 :
2910 : /* Set up for scanning said identity index */
2911 : {
2912 : Form_pg_index indexForm;
2913 :
2914 3 : indexForm = chgcxt->cc_ident_index->rd_index;
2915 3 : chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
2916 3 : chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
2917 6 : for (int i = 0; i < indexForm->indnkeyatts; i++)
2918 : {
2919 : ScanKey entry;
2920 : Oid opfamily,
2921 : opcintype,
2922 : opno,
2923 : opcode;
2924 :
2925 3 : entry = &chgcxt->cc_ident_key[i];
2926 :
2927 3 : opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
2928 3 : opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
2929 3 : opno = get_opfamily_member(opfamily, opcintype, opcintype,
2930 : BTEqualStrategyNumber);
2931 3 : if (!OidIsValid(opno))
2932 0 : elog(ERROR, "failed to find = operator for type %u", opcintype);
2933 3 : opcode = get_opcode(opno);
2934 3 : if (!OidIsValid(opcode))
2935 0 : elog(ERROR, "failed to find = operator for operator %u", opno);
2936 :
2937 : /* Initialize everything but argument. */
2938 3 : ScanKeyInit(entry,
2939 3 : i + 1,
2940 : BTEqualStrategyNumber, opcode,
2941 : (Datum) 0);
2942 3 : entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
2943 : }
2944 : }
2945 :
2946 3 : chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
2947 3 : }
2948 :
2949 : /*
2950 : * Free up resources taken by a ChangeContext.
2951 : */
2952 : static void
2953 3 : release_change_context(ChangeContext *chgcxt)
2954 : {
2955 3 : ExecCloseIndices(chgcxt->cc_rri);
2956 3 : FreeExecutorState(chgcxt->cc_estate);
2957 : /* XXX are these pfrees necessary? */
2958 3 : pfree(chgcxt->cc_rri);
2959 3 : pfree(chgcxt->cc_ident_key);
2960 3 : }
2961 :
2962 : /*
2963 : * The final steps of rebuild_relation() for concurrent processing.
2964 : *
2965 : * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
2966 : * clustering index (if one is passed) are still locked in a mode that allows
2967 : * concurrent data changes. On exit, both tables and their indexes are closed,
2968 : * but locked in AccessExclusiveLock mode.
2969 : */
2970 : static void
2971 3 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
2972 : Oid identIdx, TransactionId frozenXid,
2973 : MultiXactId cutoffMulti)
2974 : {
2975 : List *ind_oids_new;
2976 3 : Oid old_table_oid = RelationGetRelid(OldHeap);
2977 3 : Oid new_table_oid = RelationGetRelid(NewHeap);
2978 3 : List *ind_oids_old = RelationGetIndexList(OldHeap);
2979 : ListCell *lc,
2980 : *lc2;
2981 : char relpersistence;
2982 : bool is_system_catalog;
2983 : Oid ident_idx_new;
2984 : XLogRecPtr end_of_wal;
2985 : List *indexrels;
2986 : ChangeContext chgcxt;
2987 :
2988 : Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
2989 : Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
2990 :
2991 : /*
2992 : * Unlike the exclusive case, we build new indexes for the new relation
2993 : * rather than swapping the storage and reindexing the old relation. The
2994 : * point is that the index build can take some time, so we do it before we
2995 : * get AccessExclusiveLock on the old heap and therefore we cannot swap
2996 : * the heap storage yet.
2997 : *
2998 : * index_create() will lock the new indexes using AccessExclusiveLock - no
2999 : * need to change that. At the same time, we use ShareUpdateExclusiveLock
3000 : * to lock the existing indexes - that should be enough to prevent others
3001 : * from changing them while we're repacking the relation. The lock on
3002 : * table should prevent others from changing the index column list, but
3003 : * might not be enough for commands like ALTER INDEX ... SET ... (Those
3004 : * are not necessarily dangerous, but can make user confused if the
3005 : * changes they do get lost due to REPACK.)
3006 : */
3007 3 : ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
3008 :
3009 : /*
3010 : * The identity index in the new relation appears in the same relative
3011 : * position as the corresponding index in the old relation. Find it.
3012 : */
3013 3 : ident_idx_new = InvalidOid;
3014 6 : foreach_oid(ind_old, ind_oids_old)
3015 : {
3016 3 : if (identIdx == ind_old)
3017 : {
3018 3 : int pos = foreach_current_index(ind_old);
3019 :
3020 3 : if (unlikely(list_length(ind_oids_new) < pos))
3021 0 : elog(ERROR, "list of new indexes too short");
3022 3 : ident_idx_new = list_nth_oid(ind_oids_new, pos);
3023 3 : break;
3024 : }
3025 : }
3026 3 : if (!OidIsValid(ident_idx_new))
3027 0 : elog(ERROR, "could not find index matching \"%s\" at the new relation",
3028 : get_rel_name(identIdx));
3029 :
3030 : /* Gather information to apply concurrent changes. */
3031 3 : initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
3032 :
3033 : /*
3034 : * During testing, wait for another backend to perform concurrent data
3035 : * changes which we will process below.
3036 : */
3037 3 : INJECTION_POINT("repack-concurrently-before-lock", NULL);
3038 :
3039 : /*
3040 : * Flush all WAL records inserted so far (possibly except for the last
3041 : * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3042 : * we need to flush while holding exclusive lock on the source table.
3043 : */
3044 3 : XLogFlush(GetXLogInsertEndRecPtr());
3045 3 : end_of_wal = GetFlushRecPtr(NULL);
3046 :
3047 : /*
3048 : * Apply concurrent changes first time, to minimize the time we need to
3049 : * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3050 : * written during the data copying and index creation.)
3051 : */
3052 3 : process_concurrent_changes(end_of_wal, &chgcxt, false);
3053 :
3054 : /*
3055 : * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3056 : * is one), all its indexes, so that we can swap the files.
3057 : */
3058 3 : LockRelationOid(old_table_oid, AccessExclusiveLock);
3059 :
3060 : /*
3061 : * Lock all indexes now, not only the clustering one: all indexes need to
3062 : * have their files swapped. While doing that, store their relation
3063 : * references in a zero-terminated array, to handle predicate locks below.
3064 : */
3065 3 : indexrels = NIL;
3066 10 : foreach_oid(ind_oid, ind_oids_old)
3067 : {
3068 : Relation index;
3069 :
3070 4 : index = index_open(ind_oid, AccessExclusiveLock);
3071 :
3072 : /*
3073 : * Some things about the index may have changed before we locked the
3074 : * index, such as ALTER INDEX RENAME. We don't need to do anything
3075 : * here to absorb those changes in the new index.
3076 : */
3077 4 : indexrels = lappend(indexrels, index);
3078 : }
3079 :
3080 : /*
3081 : * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3082 : * needed to swap the files.
3083 : */
3084 3 : if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3085 1 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3086 :
3087 : /*
3088 : * Tuples and pages of the old heap will be gone, but the heap will stay.
3089 : */
3090 3 : TransferPredicateLocksToHeapRelation(OldHeap);
3091 10 : foreach_ptr(RelationData, index, indexrels)
3092 : {
3093 4 : TransferPredicateLocksToHeapRelation(index);
3094 4 : index_close(index, NoLock);
3095 : }
3096 3 : list_free(indexrels);
3097 :
3098 : /*
3099 : * Flush WAL again, to make sure that all changes committed while we were
3100 : * waiting for the exclusive lock are available for decoding.
3101 : */
3102 3 : XLogFlush(GetXLogInsertEndRecPtr());
3103 3 : end_of_wal = GetFlushRecPtr(NULL);
3104 :
3105 : /*
3106 : * Apply the concurrent changes again. Indicate that the decoding worker
3107 : * won't be needed anymore.
3108 : */
3109 3 : process_concurrent_changes(end_of_wal, &chgcxt, true);
3110 :
3111 : /* Remember info about rel before closing OldHeap */
3112 3 : relpersistence = OldHeap->rd_rel->relpersistence;
3113 3 : is_system_catalog = IsSystemRelation(OldHeap);
3114 :
3115 3 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3116 : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
3117 :
3118 : /*
3119 : * Even ShareUpdateExclusiveLock should have prevented others from
3120 : * creating / dropping indexes (even using the CONCURRENTLY option), so we
3121 : * do not need to check whether the lists match.
3122 : */
3123 7 : forboth(lc, ind_oids_old, lc2, ind_oids_new)
3124 : {
3125 4 : Oid ind_old = lfirst_oid(lc);
3126 4 : Oid ind_new = lfirst_oid(lc2);
3127 4 : Oid mapped_tables[4] = {0};
3128 :
3129 4 : swap_relation_files(ind_old, ind_new,
3130 : (old_table_oid == RelationRelationId),
3131 : false, /* swap_toast_by_content */
3132 : true,
3133 : InvalidTransactionId,
3134 : InvalidMultiXactId,
3135 : mapped_tables);
3136 :
3137 : #ifdef USE_ASSERT_CHECKING
3138 :
3139 : /*
3140 : * Concurrent processing is not supported for system relations, so
3141 : * there should be no mapped tables.
3142 : */
3143 : for (int i = 0; i < 4; i++)
3144 : Assert(!OidIsValid(mapped_tables[i]));
3145 : #endif
3146 : }
3147 :
3148 : /* The new indexes must be visible for deletion. */
3149 3 : CommandCounterIncrement();
3150 :
3151 : /* Close the old heap but keep lock until transaction commit. */
3152 3 : table_close(OldHeap, NoLock);
3153 : /* Close the new heap. (We didn't have to open its indexes). */
3154 3 : table_close(NewHeap, NoLock);
3155 :
3156 : /* Cleanup what we don't need anymore. (And close the identity index.) */
3157 3 : release_change_context(&chgcxt);
3158 :
3159 : /*
3160 : * Swap the relations and their TOAST relations and TOAST indexes. This
3161 : * also drops the new relation and its indexes.
3162 : *
3163 : * (System catalogs are currently not supported.)
3164 : */
3165 : Assert(!is_system_catalog);
3166 3 : finish_heap_swap(old_table_oid, new_table_oid,
3167 : is_system_catalog,
3168 : false, /* swap_toast_by_content */
3169 : false,
3170 : true,
3171 : false, /* reindex */
3172 : frozenXid, cutoffMulti,
3173 : relpersistence);
3174 3 : }
3175 :
3176 : /*
3177 : * Build indexes on NewHeap according to those on OldHeap.
3178 : *
3179 : * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3180 : * up locked using ShareUpdateExclusiveLock.
3181 : *
3182 : * A list of OIDs of the corresponding indexes created on NewHeap is
3183 : * returned. The order of items does match, so we can use these arrays to swap
3184 : * index storage.
3185 : */
3186 : static List *
3187 3 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
3188 : {
3189 3 : List *result = NIL;
3190 :
3191 3 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3192 : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
3193 :
3194 10 : foreach_oid(oldindex, OldIndexes)
3195 : {
3196 : Oid newindex;
3197 : char *newName;
3198 : Relation ind;
3199 :
3200 4 : ind = index_open(oldindex, ShareUpdateExclusiveLock);
3201 :
3202 4 : newName = ChooseRelationName(get_rel_name(oldindex),
3203 : NULL,
3204 : "repacknew",
3205 4 : get_rel_namespace(ind->rd_index->indrelid),
3206 : false);
3207 4 : newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
3208 4 : oldindex, ind->rd_rel->reltablespace,
3209 : newName);
3210 4 : copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
3211 4 : result = lappend_oid(result, newindex);
3212 :
3213 4 : index_close(ind, NoLock);
3214 : }
3215 :
3216 3 : return result;
3217 : }
3218 :
3219 : /*
3220 : * Create a transient copy of a constraint -- supported by a transient
3221 : * copy of the index that supports the original constraint.
3222 : *
3223 : * When repacking a table that contains exclusion constraints, the executor
3224 : * relies on these constraints being properly catalogued. These copies are
3225 : * to support that.
3226 : *
3227 : * We don't need the constraints for anything else (the original constraints
3228 : * will be there once repack completes), so we add pg_depend entries so that
3229 : * the are dropped when the transient table is dropped.
3230 : */
3231 : static void
3232 4 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
3233 : {
3234 : ScanKeyData skey;
3235 : Relation rel;
3236 : TupleDesc desc;
3237 : SysScanDesc scan;
3238 : HeapTuple tup;
3239 : ObjectAddress objrel;
3240 :
3241 4 : rel = table_open(ConstraintRelationId, RowExclusiveLock);
3242 4 : ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
3243 :
3244 : /*
3245 : * Retrieve the constraints supported by the old index and create an
3246 : * identical one that points to the new index.
3247 : */
3248 4 : ScanKeyInit(&skey,
3249 : Anum_pg_constraint_conrelid,
3250 : BTEqualStrategyNumber, F_OIDEQ,
3251 4 : ObjectIdGetDatum(old_index->rd_index->indrelid));
3252 4 : scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
3253 : NULL, 1, &skey);
3254 4 : desc = RelationGetDescr(rel);
3255 12 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3256 : {
3257 8 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
3258 : Oid oid;
3259 8 : Datum values[Natts_pg_constraint] = {0};
3260 8 : bool nulls[Natts_pg_constraint] = {0};
3261 8 : bool replaces[Natts_pg_constraint] = {0};
3262 : HeapTuple new_tup;
3263 : ObjectAddress objcon;
3264 :
3265 8 : if (conform->conindid != RelationGetRelid(old_index))
3266 5 : continue;
3267 :
3268 3 : oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
3269 : Anum_pg_constraint_oid);
3270 3 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
3271 3 : replaces[Anum_pg_constraint_oid - 1] = true;
3272 3 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
3273 3 : replaces[Anum_pg_constraint_conrelid - 1] = true;
3274 3 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
3275 3 : replaces[Anum_pg_constraint_conindid - 1] = true;
3276 :
3277 3 : new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3278 :
3279 : /* Insert it into the catalog. */
3280 3 : CatalogTupleInsert(rel, new_tup);
3281 :
3282 : /* Create a dependency so it's removed when we drop the new heap. */
3283 3 : ObjectAddressSet(objcon, ConstraintRelationId, oid);
3284 3 : recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
3285 : }
3286 4 : systable_endscan(scan);
3287 :
3288 4 : table_close(rel, RowExclusiveLock);
3289 :
3290 4 : CommandCounterIncrement();
3291 4 : }
3292 :
3293 : /*
3294 : * Try to start a background worker to perform logical decoding of data
3295 : * changes applied to relation while REPACK CONCURRENTLY is copying its
3296 : * contents to a new table.
3297 : */
3298 : static void
3299 3 : start_repack_decoding_worker(Oid relid)
3300 : {
3301 : Size size;
3302 : dsm_segment *seg;
3303 : DecodingWorkerShared *shared;
3304 : shm_mq *mq;
3305 : shm_mq_handle *mqh;
3306 : BackgroundWorker bgw;
3307 :
3308 : /* Setup shared memory. */
3309 3 : size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3310 : BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
3311 3 : seg = dsm_create(size, 0);
3312 3 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
3313 3 : shared->initialized = false;
3314 3 : shared->lsn_upto = InvalidXLogRecPtr;
3315 3 : shared->done = false;
3316 3 : SharedFileSetInit(&shared->sfs, seg);
3317 3 : shared->last_exported = -1;
3318 3 : SpinLockInit(&shared->mutex);
3319 3 : shared->dbid = MyDatabaseId;
3320 :
3321 : /*
3322 : * This is the UserId set in cluster_rel(). Security context shouldn't be
3323 : * needed for decoding worker.
3324 : */
3325 3 : shared->roleid = GetUserId();
3326 3 : shared->relid = relid;
3327 3 : ConditionVariableInit(&shared->cv);
3328 3 : shared->backend_proc = MyProc;
3329 3 : shared->backend_pid = MyProcPid;
3330 3 : shared->backend_proc_number = MyProcNumber;
3331 :
3332 3 : mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3333 : REPACK_ERROR_QUEUE_SIZE);
3334 3 : shm_mq_set_receiver(mq, MyProc);
3335 3 : mqh = shm_mq_attach(mq, seg, NULL);
3336 :
3337 3 : memset(&bgw, 0, sizeof(bgw));
3338 3 : snprintf(bgw.bgw_name, BGW_MAXLEN,
3339 : "REPACK decoding worker for relation \"%s\"",
3340 : get_rel_name(relid));
3341 3 : snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3342 3 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3343 : BGWORKER_BACKEND_DATABASE_CONNECTION;
3344 3 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3345 3 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
3346 3 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3347 3 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3348 3 : bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
3349 3 : bgw.bgw_notify_pid = MyProcPid;
3350 :
3351 3 : decoding_worker = palloc0_object(DecodingWorker);
3352 3 : if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
3353 0 : ereport(ERROR,
3354 : errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
3355 : errmsg("out of background worker slots"),
3356 : errhint("You might need to increase \"%s\".", "max_worker_processes"));
3357 :
3358 3 : decoding_worker->seg = seg;
3359 3 : decoding_worker->error_mqh = mqh;
3360 :
3361 : /*
3362 : * The decoding setup must be done before the caller can have XID assigned
3363 : * for any reason, otherwise the worker might end up in a deadlock,
3364 : * waiting for the caller's transaction to end. Therefore wait here until
3365 : * the worker indicates that it has the logical decoding initialized.
3366 : */
3367 3 : ConditionVariablePrepareToSleep(&shared->cv);
3368 : for (;;)
3369 5 : {
3370 : bool initialized;
3371 :
3372 8 : SpinLockAcquire(&shared->mutex);
3373 8 : initialized = shared->initialized;
3374 8 : SpinLockRelease(&shared->mutex);
3375 :
3376 8 : if (initialized)
3377 3 : break;
3378 :
3379 5 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3380 : }
3381 3 : ConditionVariableCancelSleep();
3382 3 : }
3383 :
3384 : /*
3385 : * Stop the decoding worker and cleanup the related resources.
3386 : *
3387 : * The worker stops on its own when it knows there is no more work to do, but
3388 : * we need to stop it explicitly at least on ERROR in the launching backend.
3389 : */
3390 : static void
3391 3 : stop_repack_decoding_worker(void)
3392 : {
3393 : BgwHandleStatus status;
3394 :
3395 : /* Haven't reached the worker startup? */
3396 3 : if (decoding_worker == NULL)
3397 0 : return;
3398 :
3399 : /* Could not register the worker? */
3400 3 : if (decoding_worker->handle == NULL)
3401 0 : return;
3402 :
3403 3 : TerminateBackgroundWorker(decoding_worker->handle);
3404 : /* The worker should really exit before the REPACK command does. */
3405 3 : HOLD_INTERRUPTS();
3406 3 : status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
3407 3 : RESUME_INTERRUPTS();
3408 :
3409 3 : if (status == BGWH_POSTMASTER_DIED)
3410 0 : ereport(FATAL,
3411 : errcode(ERRCODE_ADMIN_SHUTDOWN),
3412 : errmsg("postmaster exited during REPACK command"));
3413 :
3414 3 : shm_mq_detach(decoding_worker->error_mqh);
3415 :
3416 : /*
3417 : * If we could not cancel the current sleep due to ERROR, do that before
3418 : * we detach from the shared memory the condition variable is located in.
3419 : * If we did not, the bgworker ERROR handling code would try and fail
3420 : * badly.
3421 : */
3422 3 : ConditionVariableCancelSleep();
3423 :
3424 3 : dsm_detach(decoding_worker->seg);
3425 3 : pfree(decoding_worker);
3426 3 : decoding_worker = NULL;
3427 : }
3428 :
3429 : /*
3430 : * Get the initial snapshot from the decoding worker.
3431 : */
3432 : static Snapshot
3433 3 : get_initial_snapshot(DecodingWorker *worker)
3434 : {
3435 : DecodingWorkerShared *shared;
3436 : char fname[MAXPGPATH];
3437 : BufFile *file;
3438 : Size snap_size;
3439 : char *snap_space;
3440 : Snapshot snapshot;
3441 :
3442 3 : shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3443 :
3444 : /*
3445 : * The worker needs to initialize the logical decoding, which usually
3446 : * takes some time. Therefore it makes sense to prepare for the sleep
3447 : * first.
3448 : */
3449 3 : ConditionVariablePrepareToSleep(&shared->cv);
3450 : for (;;)
3451 2 : {
3452 : int last_exported;
3453 :
3454 5 : SpinLockAcquire(&shared->mutex);
3455 5 : last_exported = shared->last_exported;
3456 5 : SpinLockRelease(&shared->mutex);
3457 :
3458 : /*
3459 : * Has the worker exported the file we are waiting for?
3460 : */
3461 5 : if (last_exported == WORKER_FILE_SNAPSHOT)
3462 3 : break;
3463 :
3464 2 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3465 : }
3466 3 : ConditionVariableCancelSleep();
3467 :
3468 : /* Read the snapshot from a file. */
3469 3 : DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
3470 3 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3471 3 : BufFileReadExact(file, &snap_size, sizeof(snap_size));
3472 3 : snap_space = (char *) palloc(snap_size);
3473 3 : BufFileReadExact(file, snap_space, snap_size);
3474 3 : BufFileClose(file);
3475 :
3476 : /* Restore it. */
3477 3 : snapshot = RestoreSnapshot(snap_space);
3478 3 : pfree(snap_space);
3479 :
3480 3 : return snapshot;
3481 : }
3482 :
3483 : /*
3484 : * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3485 : * If relations of the same 'relid' happen to be processed at the same time,
3486 : * they must be from different databases and therefore different backends must
3487 : * be involved.
3488 : */
3489 : void
3490 18 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
3491 : {
3492 : /* The PID is already present in the fileset name, so we needn't add it */
3493 18 : snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3494 18 : }
3495 :
3496 : /*
3497 : * Handle receipt of an interrupt indicating a repack worker message.
3498 : *
3499 : * Note: this is called within a signal handler! All we can do is set
3500 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3501 : * ProcessRepackMessages().
3502 : */
3503 : void
3504 3 : HandleRepackMessageInterrupt(void)
3505 : {
3506 3 : InterruptPending = true;
3507 3 : RepackMessagePending = true;
3508 3 : SetLatch(MyLatch);
3509 3 : }
3510 :
3511 : /*
3512 : * Process any queued protocol messages received from the repack worker.
3513 : */
3514 : void
3515 3 : ProcessRepackMessages(void)
3516 : {
3517 : MemoryContext oldcontext;
3518 : static MemoryContext hpm_context = NULL;
3519 :
3520 : /*
3521 : * Nothing to do if we haven't launched the worker yet or have already
3522 : * terminated it.
3523 : */
3524 3 : if (decoding_worker == NULL)
3525 0 : return;
3526 :
3527 : /*
3528 : * This is invoked from ProcessInterrupts(), and since some of the
3529 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3530 : * for recursive calls if more signals are received while this runs. It's
3531 : * unclear that recursive entry would be safe, and it doesn't seem useful
3532 : * even if it is safe, so let's block interrupts until done.
3533 : */
3534 3 : HOLD_INTERRUPTS();
3535 :
3536 : /*
3537 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3538 : * don't want to risk leaking data into long-lived contexts, so let's do
3539 : * our work here in a private context that we can reset on each use.
3540 : */
3541 3 : if (hpm_context == NULL) /* first time through? */
3542 3 : hpm_context = AllocSetContextCreate(TopMemoryContext,
3543 : "ProcessRepackMessages",
3544 : ALLOCSET_DEFAULT_SIZES);
3545 : else
3546 0 : MemoryContextReset(hpm_context);
3547 :
3548 3 : oldcontext = MemoryContextSwitchTo(hpm_context);
3549 :
3550 : /* OK to process messages. Reset the flag saying there are more to do. */
3551 3 : RepackMessagePending = false;
3552 :
3553 : /*
3554 : * Read as many messages as we can from the worker, but stop when no more
3555 : * messages can be read from the worker without blocking.
3556 : */
3557 : while (true)
3558 0 : {
3559 : shm_mq_result res;
3560 : Size nbytes;
3561 : void *data;
3562 :
3563 3 : res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
3564 : &data, true);
3565 3 : if (res == SHM_MQ_WOULD_BLOCK)
3566 0 : break;
3567 3 : else if (res == SHM_MQ_SUCCESS)
3568 : {
3569 : StringInfoData msg;
3570 :
3571 0 : initStringInfo(&msg);
3572 0 : appendBinaryStringInfo(&msg, data, nbytes);
3573 0 : ProcessRepackMessage(&msg);
3574 0 : pfree(msg.data);
3575 : }
3576 : else
3577 : {
3578 : /*
3579 : * The decoding worker is special in that it exits as soon as it
3580 : * has its work done. Thus the DETACHED result code is fine.
3581 : */
3582 : Assert(res == SHM_MQ_DETACHED);
3583 :
3584 3 : break;
3585 : }
3586 : }
3587 :
3588 3 : MemoryContextSwitchTo(oldcontext);
3589 :
3590 : /* Might as well clear the context on our way out */
3591 3 : MemoryContextReset(hpm_context);
3592 :
3593 3 : RESUME_INTERRUPTS();
3594 : }
3595 :
3596 : /*
3597 : * Process a single protocol message received from a single parallel worker.
3598 : */
3599 : static void
3600 0 : ProcessRepackMessage(StringInfo msg)
3601 : {
3602 : char msgtype;
3603 :
3604 0 : msgtype = pq_getmsgbyte(msg);
3605 :
3606 0 : switch (msgtype)
3607 : {
3608 0 : case PqMsg_ErrorResponse:
3609 : case PqMsg_NoticeResponse:
3610 : {
3611 : ErrorData edata;
3612 :
3613 : /* Parse ErrorResponse or NoticeResponse. */
3614 0 : pq_parse_errornotice(msg, &edata);
3615 :
3616 : /* Death of a worker isn't enough justification for suicide. */
3617 0 : edata.elevel = Min(edata.elevel, ERROR);
3618 :
3619 : /*
3620 : * Add a context line to show that this is a message
3621 : * propagated from the worker. Otherwise, it can sometimes be
3622 : * confusing to understand what actually happened.
3623 : */
3624 0 : if (edata.context)
3625 0 : edata.context = psprintf("%s\n%s", edata.context,
3626 : _("REPACK decoding worker"));
3627 : else
3628 0 : edata.context = pstrdup(_("REPACK decoding worker"));
3629 :
3630 : /* Rethrow error or print notice. */
3631 0 : ThrowErrorData(&edata);
3632 :
3633 0 : break;
3634 : }
3635 :
3636 0 : default:
3637 : {
3638 0 : elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3639 : msgtype, msg->len);
3640 : }
3641 : }
3642 0 : }
|