Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * repack.c
4 : : * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 : : * parts of this code.
6 : : *
7 : : * There are two somewhat different ways to rewrite a table. In non-
8 : : * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 : : * transient relation, copy the tuples over to the relfilenode of the new
10 : : * relation, swap the relfilenodes, then drop the old relation.
11 : : *
12 : : * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 : : * then do an initial copy as above. However, while the tuples are being
14 : : * copied, concurrent transactions could modify the table. To cope with those
15 : : * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 : : * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 : : * from being reserved), and accumulates the changes in a file. Once the
18 : : * initial copy is complete, we read the changes from the file and re-apply
19 : : * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 : : * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 : : * a strong lock on the table is much reduced, and the bloat is eliminated.
22 : : *
23 : : *
24 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 : : * Portions Copyright (c) 1994-5, Regents of the University of California
26 : : *
27 : : *
28 : : * IDENTIFICATION
29 : : * src/backend/commands/repack.c
30 : : *
31 : : *-------------------------------------------------------------------------
32 : : */
33 : : #include "postgres.h"
34 : :
35 : : #include "access/amapi.h"
36 : : #include "access/heapam.h"
37 : : #include "access/multixact.h"
38 : : #include "access/relscan.h"
39 : : #include "access/tableam.h"
40 : : #include "access/toast_internals.h"
41 : : #include "access/transam.h"
42 : : #include "access/xact.h"
43 : : #include "access/xlog.h"
44 : : #include "catalog/catalog.h"
45 : : #include "catalog/dependency.h"
46 : : #include "catalog/heap.h"
47 : : #include "catalog/index.h"
48 : : #include "catalog/namespace.h"
49 : : #include "catalog/objectaccess.h"
50 : : #include "catalog/pg_am.h"
51 : : #include "catalog/pg_attrdef.h"
52 : : #include "catalog/pg_constraint.h"
53 : : #include "catalog/pg_inherits.h"
54 : : #include "catalog/toasting.h"
55 : : #include "commands/defrem.h"
56 : : #include "commands/progress.h"
57 : : #include "commands/repack.h"
58 : : #include "commands/repack_internal.h"
59 : : #include "commands/tablecmds.h"
60 : : #include "commands/vacuum.h"
61 : : #include "executor/executor.h"
62 : : #include "libpq/pqformat.h"
63 : : #include "libpq/pqmq.h"
64 : : #include "miscadmin.h"
65 : : #include "optimizer/optimizer.h"
66 : : #include "parser/parse_relation.h"
67 : : #include "pgstat.h"
68 : : #include "replication/logicalrelation.h"
69 : : #include "storage/bufmgr.h"
70 : : #include "storage/ipc.h"
71 : : #include "storage/lmgr.h"
72 : : #include "storage/predicate.h"
73 : : #include "storage/proc.h"
74 : : #include "utils/acl.h"
75 : : #include "utils/fmgroids.h"
76 : : #include "utils/guc.h"
77 : : #include "utils/injection_point.h"
78 : : #include "utils/inval.h"
79 : : #include "utils/lsyscache.h"
80 : : #include "utils/memutils.h"
81 : : #include "utils/pg_rusage.h"
82 : : #include "utils/relmapper.h"
83 : : #include "utils/snapmgr.h"
84 : : #include "utils/syscache.h"
85 : : #include "utils/wait_event_types.h"
86 : :
87 : : /*
88 : : * This struct is used to pass around the information on tables to be
89 : : * clustered. We need this so we can make a list of them when invoked without
90 : : * a specific table/index pair.
91 : : */
92 : : typedef struct
93 : : {
94 : : Oid tableOid;
95 : : Oid indexOid;
96 : : } RelToCluster;
97 : :
98 : : /*
99 : : * The first file exported by the decoding worker must contain a snapshot, the
100 : : * following ones contain the data changes.
101 : : */
102 : : #define WORKER_FILE_SNAPSHOT 0
103 : :
104 : : /*
105 : : * Information needed to apply concurrent data changes.
106 : : */
107 : : typedef struct ChangeContext
108 : : {
109 : : /* The relation the changes are applied to. */
110 : : Relation cc_rel;
111 : :
112 : : /* Needed to update indexes of cc_rel. */
113 : : ResultRelInfo *cc_rri;
114 : : EState *cc_estate;
115 : :
116 : : /*
117 : : * Existing tuples to UPDATE and DELETE are located via this index. We
118 : : * keep the scankey in partially initialized state to avoid repeated work.
119 : : * sk_argument is completed on the fly.
120 : : */
121 : : Relation cc_ident_index;
122 : : ScanKey cc_ident_key;
123 : : int cc_ident_key_nentries;
124 : :
125 : : /* The latest column we need to deform to have the tuple identity */
126 : : AttrNumber cc_last_key_attno;
127 : :
128 : : /* Sequential number of the file containing the changes. */
129 : : int cc_file_seq;
130 : : } ChangeContext;
131 : :
132 : : /*
133 : : * Backend-local information to control the decoding worker.
134 : : */
135 : : typedef struct DecodingWorker
136 : : {
137 : : /* The worker. */
138 : : BackgroundWorkerHandle *handle;
139 : :
140 : : /* DecodingWorkerShared is in this segment. */
141 : : dsm_segment *seg;
142 : :
143 : : /* Handle of the error queue. */
144 : : shm_mq_handle *error_mqh;
145 : : } DecodingWorker;
146 : :
147 : : /* Pointer to currently running decoding worker. */
148 : : static DecodingWorker *decoding_worker = NULL;
149 : :
150 : : /*
151 : : * Is there a message sent by a repack worker that the backend needs to
152 : : * receive?
153 : : */
154 : : volatile sig_atomic_t RepackMessagePending = false;
155 : :
156 : : static LOCKMODE RepackLockLevel(bool concurrent);
157 : : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
158 : : Oid indexOid, Oid userid, LOCKMODE lmode,
159 : : int options);
160 : : static void check_concurrent_repack_requirements(Relation rel,
161 : : Oid *ident_idx_p);
162 : : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
163 : : Oid ident_idx);
164 : : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
165 : : Snapshot snapshot,
166 : : bool verbose,
167 : : bool *pSwapToastByContent,
168 : : TransactionId *pFreezeXid,
169 : : MultiXactId *pCutoffMulti);
170 : : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
171 : : MemoryContext permcxt);
172 : : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
173 : : Oid relid, bool rel_is_index,
174 : : MemoryContext permcxt);
175 : : static bool repack_is_permitted_for_relation(RepackCommand cmd,
176 : : Oid relid, Oid userid);
177 : :
178 : : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
179 : : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
180 : : ChangeContext *chgcxt);
181 : : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
182 : : TupleTableSlot *ondisk_tuple,
183 : : ChangeContext *chgcxt);
184 : : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
185 : : static void restore_tuple(BufFile *file, Relation relation,
186 : : TupleTableSlot *slot);
187 : : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
188 : : TupleTableSlot *src);
189 : : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
190 : : TupleTableSlot *locator,
191 : : TupleTableSlot *retrieved);
192 : : static bool identity_key_equal(ChangeContext *chgcxt,
193 : : TupleTableSlot *locator,
194 : : TupleTableSlot *candidate);
195 : : static void process_concurrent_changes(XLogRecPtr end_of_wal,
196 : : ChangeContext *chgcxt,
197 : : bool done);
198 : : static void initialize_change_context(ChangeContext *chgcxt,
199 : : Relation relation,
200 : : Oid ident_index_id);
201 : : static void release_change_context(ChangeContext *chgcxt);
202 : : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
203 : : Oid identIdx,
204 : : TransactionId frozenXid,
205 : : MultiXactId cutoffMulti);
206 : : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
207 : : static void copy_index_constraints(Relation old_index, Oid new_index_id,
208 : : Oid new_heap_id);
209 : : static void copy_attribute_defaults(Oid old_heap_oid, Oid new_heap_oid);
210 : : static Relation process_single_relation(RepackStmt *stmt,
211 : : LOCKMODE lockmode,
212 : : bool isTopLevel,
213 : : ClusterParams *params);
214 : : static Oid determine_clustered_index(Relation rel, bool usingindex,
215 : : const char *indexname);
216 : :
217 : : static void start_repack_decoding_worker(Oid relid);
218 : : static void stop_repack_decoding_worker(void);
219 : : static void stop_repack_decoding_worker_cb(int code, Datum arg);
220 : : static Snapshot get_initial_snapshot(DecodingWorker *worker);
221 : :
222 : : static void ProcessRepackMessage(StringInfo msg);
223 : : static const char *RepackCommandAsString(RepackCommand cmd);
224 : :
225 : :
226 : : /*
227 : : * The repack code allows for processing multiple tables at once. Because
228 : : * of this, we cannot just run everything on a single transaction, or we
229 : : * would be forced to acquire exclusive locks on all the tables being
230 : : * clustered, simultaneously --- very likely leading to deadlock.
231 : : *
232 : : * To solve this we follow a similar strategy to VACUUM code, processing each
233 : : * relation in a separate transaction. For this to work, we need to:
234 : : *
235 : : * - provide a separate memory context so that we can pass information in
236 : : * a way that survives across transactions
237 : : * - start a new transaction every time a new relation is clustered
238 : : * - check for validity of the information on to-be-clustered relations,
239 : : * as someone might have deleted a relation behind our back, or
240 : : * clustered one on a different index
241 : : * - end the transaction
242 : : *
243 : : * The single-relation case does not have any such overhead.
244 : : *
245 : : * We also allow a relation to be repacked following an index, but without
246 : : * naming a specific one. In that case, the indisclustered bit will be
247 : : * looked up, and an ERROR will be thrown if no so-marked index is found.
248 : : */
249 : : void
250 : 230 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
251 : : {
252 : 230 : ClusterParams params = {0};
253 : 230 : Relation rel = NULL;
254 : : MemoryContext repack_context;
255 : : LOCKMODE lockmode;
256 : : List *rtcs;
257 : 230 : bool verbose = false;
258 : 230 : bool analyze = false;
259 : 230 : bool concurrently = false;
260 : :
261 : : /* Parse option list */
262 [ + + + + : 490 : foreach_node(DefElem, opt, stmt->params)
+ + ]
263 : : {
264 [ + + ]: 30 : if (strcmp(opt->defname, "verbose") == 0)
265 : 7 : verbose = defGetBoolean(opt);
266 [ + + ]: 23 : else if (strcmp(opt->defname, "analyze") == 0 ||
267 [ - + ]: 15 : strcmp(opt->defname, "analyse") == 0)
268 : 8 : analyze = defGetBoolean(opt);
269 [ + - ]: 15 : else if (strcmp(opt->defname, "concurrently") == 0)
270 : : {
271 [ - + ]: 15 : if (stmt->command != REPACK_COMMAND_REPACK)
272 [ # # ]: 0 : ereport(ERROR,
273 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
274 : : errmsg("CONCURRENTLY option not supported for %s",
275 : : RepackCommandAsString(stmt->command)));
276 : 15 : concurrently = defGetBoolean(opt);
277 : : }
278 : : else
279 [ # # ]: 0 : ereport(ERROR,
280 : : errcode(ERRCODE_SYNTAX_ERROR),
281 : : errmsg("unrecognized %s option \"%s\"",
282 : : RepackCommandAsString(stmt->command),
283 : : opt->defname),
284 : : parser_errposition(pstate, opt->location));
285 : : }
286 : :
287 : 460 : params.options |=
288 : 460 : (verbose ? CLUOPT_VERBOSE : 0) |
289 [ + + ]: 230 : (analyze ? CLUOPT_ANALYZE : 0) |
290 [ + + ]: 230 : (concurrently ? CLUOPT_CONCURRENT : 0);
291 : :
292 : : /* Determine the lock mode to use. */
293 : 230 : lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
294 : :
295 [ + + ]: 230 : if ((params.options & CLUOPT_CONCURRENT) != 0)
296 : : {
297 : : /*
298 : : * Make sure we're not in a transaction block.
299 : : *
300 : : * The reason is that repack_setup_logical_decoding() could wait
301 : : * indefinitely for our XID to complete. (The deadlock detector would
302 : : * not recognize it because we'd be waiting for ourselves, i.e. no
303 : : * real lock conflict.) It would be possible to run in a transaction
304 : : * block if we had no XID, but this restriction is simpler for users
305 : : * to understand and we don't lose any functionality.
306 : : */
307 : 15 : PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
308 : : }
309 : :
310 : : /*
311 : : * If a single relation is specified, process it and we're done ... unless
312 : : * the relation is a partitioned table, in which case we fall through.
313 : : */
314 [ + + ]: 230 : if (stmt->relation != NULL)
315 : : {
316 : 214 : rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms);
317 [ + + ]: 170 : if (rel == NULL)
318 : 137 : return; /* all done */
319 : : }
320 : :
321 : : /*
322 : : * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
323 : : * can add support for this later.
324 : : */
325 [ - + ]: 49 : if (params.options & CLUOPT_ANALYZE)
326 [ # # ]: 0 : ereport(ERROR,
327 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
328 : : errmsg("cannot execute %s on multiple tables",
329 : : "REPACK (ANALYZE)"));
330 : :
331 : : /*
332 : : * By here, we know we are in a multi-table situation.
333 : : *
334 : : * Concurrent processing is currently considered rather special (e.g. in
335 : : * terms of resources consumed) so it is not performed in bulk.
336 : : */
337 [ + + ]: 49 : if (params.options & CLUOPT_CONCURRENT)
338 : : {
339 [ + - ]: 1 : if (rel != NULL)
340 : : {
341 : : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
342 [ + - ]: 1 : ereport(ERROR,
343 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
344 : : errmsg("%s is not supported for partitioned tables",
345 : : "REPACK (CONCURRENTLY)"),
346 : : errhint("Consider running the command on individual partitions."));
347 : : }
348 : : else
349 [ # # ]: 0 : ereport(ERROR,
350 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
351 : : errmsg("%s requires an explicit table name",
352 : : "REPACK (CONCURRENTLY)"));
353 : : }
354 : :
355 : : /*
356 : : * In order to avoid holding locks for too long, we want to process each
357 : : * table in its own transaction. This forces us to disallow running
358 : : * inside a user transaction block.
359 : : */
360 : 48 : PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
361 : :
362 : : /* Also, we need a memory context to hold our list of relations */
363 : 48 : repack_context = AllocSetContextCreate(PortalContext,
364 : : "Repack",
365 : : ALLOCSET_DEFAULT_SIZES);
366 : :
367 : : /*
368 : : * Since we open a new transaction for each relation, we have to check
369 : : * that the relation still is what we think it is.
370 : : *
371 : : * In single-transaction CLUSTER, we don't need the overhead.
372 : : */
373 : 48 : params.options |= CLUOPT_RECHECK;
374 : :
375 : : /*
376 : : * If we don't have a relation yet, determine a relation list. If we do,
377 : : * then it must be a partitioned table, and we want to process its
378 : : * partitions.
379 : : */
380 [ + + ]: 48 : if (rel == NULL)
381 : : {
382 : : Assert(stmt->indexname == NULL);
383 : 16 : rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
384 : : repack_context);
385 : 16 : params.options |= CLUOPT_RECHECK_ISCLUSTERED;
386 : : }
387 : : else
388 : : {
389 : : Oid relid;
390 : : bool rel_is_index;
391 : :
392 : : Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
393 : :
394 : : /*
395 : : * If USING INDEX was specified, resolve the index name now and pass
396 : : * it down.
397 : : */
398 [ + + ]: 32 : if (stmt->usingindex)
399 : : {
400 : : /*
401 : : * If no index name was specified when repacking a partitioned
402 : : * table, punt for now. Maybe we can improve this later.
403 : : */
404 [ + + ]: 28 : if (!stmt->indexname)
405 : : {
406 [ + + ]: 8 : if (stmt->command == REPACK_COMMAND_CLUSTER)
407 [ + - ]: 4 : ereport(ERROR,
408 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
409 : : errmsg("there is no previously clustered index for table \"%s\"",
410 : : RelationGetRelationName(rel)));
411 : : else
412 [ + - ]: 4 : ereport(ERROR,
413 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
414 : : /*- translator: first %s is name of a SQL command, eg. REPACK */
415 : : errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
416 : : RepackCommandAsString(stmt->command),
417 : : RelationGetRelationName(rel)));
418 : : }
419 : :
420 : 20 : relid = determine_clustered_index(rel, stmt->usingindex,
421 : 20 : stmt->indexname);
422 [ - + ]: 20 : if (!OidIsValid(relid))
423 [ # # ]: 0 : elog(ERROR, "unable to determine index to cluster on");
424 : 20 : check_index_is_clusterable(rel, relid, AccessExclusiveLock);
425 : :
426 : 16 : rel_is_index = true;
427 : : }
428 : : else
429 : : {
430 : 4 : relid = RelationGetRelid(rel);
431 : 4 : rel_is_index = false;
432 : : }
433 : :
434 : 20 : rtcs = get_tables_to_repack_partitioned(stmt->command,
435 : : relid, rel_is_index,
436 : : repack_context);
437 : :
438 : : /* close parent relation, releasing lock on it */
439 : 20 : table_close(rel, AccessExclusiveLock);
440 : 20 : rel = NULL;
441 : : }
442 : :
443 : : /* Commit to get out of starting transaction */
444 : 36 : PopActiveSnapshot();
445 : 36 : CommitTransactionCommand();
446 : :
447 : : /* Cluster the tables, each in a separate transaction */
448 : : Assert(rel == NULL);
449 [ + + + + : 124 : foreach_ptr(RelToCluster, rtc, rtcs)
+ + ]
450 : : {
451 : : /* Start a new transaction for each relation. */
452 : 52 : StartTransactionCommand();
453 : :
454 : : /*
455 : : * Open the target table, coping with the case where it has been
456 : : * dropped.
457 : : */
458 : 52 : rel = try_table_open(rtc->tableOid, lockmode);
459 [ - + ]: 52 : if (rel == NULL)
460 : : {
461 : 0 : CommitTransactionCommand();
462 : 0 : continue;
463 : : }
464 : :
465 : : /* functions in indexes may want a snapshot set */
466 : 52 : PushActiveSnapshot(GetTransactionSnapshot());
467 : :
468 : : /* Process this table */
469 : 52 : cluster_rel(stmt->command, rel, rtc->indexOid, ¶ms, isTopLevel);
470 : : /* cluster_rel closes the relation, but keeps lock */
471 : :
472 : 52 : PopActiveSnapshot();
473 : 52 : CommitTransactionCommand();
474 : : }
475 : :
476 : : /* Start a new transaction for the cleanup work. */
477 : 36 : StartTransactionCommand();
478 : :
479 : : /* Clean up working storage */
480 : 36 : MemoryContextDelete(repack_context);
481 : : }
482 : :
483 : : /*
484 : : * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
485 : : * operation to avoid any lock-upgrade hazards. In the concurrent case, we
486 : : * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
487 : : * processing and only acquire AccessExclusiveLock at the end, to swap the
488 : : * relation -- supposedly for a short time.
489 : : */
490 : : static LOCKMODE
491 : 1063 : RepackLockLevel(bool concurrent)
492 : : {
493 [ + + ]: 1063 : if (concurrent)
494 : 36 : return ShareUpdateExclusiveLock;
495 : : else
496 : 1027 : return AccessExclusiveLock;
497 : : }
498 : :
499 : : /*
500 : : * cluster_rel
501 : : *
502 : : * This clusters the table by creating a new, clustered table and
503 : : * swapping the relfilenumbers of the new table and the old table, so
504 : : * the OID of the original table is preserved. Thus we do not lose
505 : : * GRANT, inheritance nor references to this table.
506 : : *
507 : : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
508 : : * the new table, it's better to create the indexes afterwards than to fill
509 : : * them incrementally while we load the table.
510 : : *
511 : : * If indexOid is InvalidOid, the table will be rewritten in physical order
512 : : * instead of index order.
513 : : *
514 : : * Note that, in the concurrent case, the function releases the lock at some
515 : : * point, in order to get AccessExclusiveLock for the final steps (i.e. to
516 : : * swap the relation files). To make things simpler, the caller should expect
517 : : * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
518 : : * AccessExclusiveLock is kept till the end of the transaction.)
519 : : *
520 : : * 'cmd' indicates which command is being executed, to be used for error
521 : : * messages.
522 : : */
523 : : void
524 : 426 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
525 : : ClusterParams *params, bool isTopLevel)
526 : : {
527 : 426 : Oid tableOid = RelationGetRelid(OldHeap);
528 : : Relation index;
529 : : LOCKMODE lmode;
530 : : Oid save_userid;
531 : : int save_sec_context;
532 : : int save_nestlevel;
533 : 426 : bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
534 : 426 : bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
535 : 426 : bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
536 : 426 : Oid ident_idx = InvalidOid;
537 : :
538 : : /* Determine the lock mode to use. */
539 : 426 : lmode = RepackLockLevel(concurrent);
540 : :
541 : : /*
542 : : * Check some preconditions in the concurrent case. This also obtains the
543 : : * replica index OID.
544 : : */
545 [ + + ]: 426 : if (concurrent)
546 : 14 : check_concurrent_repack_requirements(OldHeap, &ident_idx);
547 : :
548 : : /* Check for user-requested abort. */
549 [ - + ]: 419 : CHECK_FOR_INTERRUPTS();
550 : :
551 : 419 : pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
552 : 419 : pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
553 : :
554 : : /*
555 : : * Switch to the table owner's userid, so that any index functions are run
556 : : * as that user. Also lock down security-restricted operations and
557 : : * arrange to make GUC variable changes local to this command.
558 : : */
559 : 419 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
560 : 419 : SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
561 : : save_sec_context | SECURITY_RESTRICTED_OPERATION);
562 : 419 : save_nestlevel = NewGUCNestLevel();
563 : 419 : RestrictSearchPath();
564 : :
565 : : /*
566 : : * Recheck that the relation is still what it was when we started.
567 : : *
568 : : * Note that it's critical to skip this in single-relation CLUSTER;
569 : : * otherwise, we would reject an attempt to cluster using a
570 : : * not-previously-clustered index.
571 : : */
572 [ + + ]: 419 : if (recheck &&
573 [ - + ]: 52 : !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
574 : 52 : lmode, params->options))
575 : 0 : goto out;
576 : :
577 : : /*
578 : : * We allow repacking shared catalogs only when not using an index. It
579 : : * would work to use an index in most respects, but the index would only
580 : : * get marked as indisclustered in the current database, leading to
581 : : * unexpected behavior if CLUSTER were later invoked in another database.
582 : : */
583 [ + + - + ]: 419 : if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
584 [ # # ]: 0 : ereport(ERROR,
585 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
586 : : /*- translator: first %s is name of a SQL command, eg. REPACK */
587 : : errmsg("cannot execute %s on a shared catalog",
588 : : RepackCommandAsString(cmd)));
589 : :
590 : : /*
591 : : * The CONCURRENTLY case should have been rejected earlier because it does
592 : : * not support system catalogs.
593 : : */
594 : : Assert(!(OldHeap->rd_rel->relisshared && concurrent));
595 : :
596 : : /*
597 : : * Don't process temp tables of other backends ... their local buffer
598 : : * manager is not going to cope.
599 : : */
600 [ + + - + ]: 419 : if (RELATION_IS_OTHER_TEMP(OldHeap))
601 [ # # ]: 0 : ereport(ERROR,
602 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
603 : : /*- translator: first %s is name of a SQL command, eg. REPACK */
604 : : errmsg("cannot execute %s on temporary tables of other sessions",
605 : : RepackCommandAsString(cmd)));
606 : :
607 : : /*
608 : : * Also check for active uses of the relation in the current transaction,
609 : : * including open scans and pending AFTER trigger events.
610 : : */
611 : 419 : CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
612 : :
613 : : /* Check heap and index are valid to cluster on */
614 [ + + ]: 419 : if (OidIsValid(indexOid))
615 : : {
616 : : /* verify the index is good and lock it */
617 : 157 : check_index_is_clusterable(OldHeap, indexOid, lmode);
618 : : /* also open it */
619 : 157 : index = index_open(indexOid, NoLock);
620 : : }
621 : : else
622 : 262 : index = NULL;
623 : :
624 : : /*
625 : : * When allow_system_table_mods is turned off, we disallow repacking a
626 : : * catalog on a particular index unless that's already the clustered index
627 : : * for that catalog.
628 : : *
629 : : * XXX We don't check for this in CLUSTER, because it's historically been
630 : : * allowed.
631 : : */
632 [ + + ]: 419 : if (cmd != REPACK_COMMAND_CLUSTER &&
633 [ + - + + ]: 289 : !allowSystemTableMods && OidIsValid(indexOid) &&
634 [ + + + - ]: 27 : IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
635 [ + - ]: 4 : ereport(ERROR,
636 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
637 : : errmsg("permission denied: \"%s\" is a system catalog",
638 : : RelationGetRelationName(OldHeap)),
639 : : errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
640 : : "allow_system_table_mods"));
641 : :
642 : : /*
643 : : * Quietly ignore the request if this is a materialized view which has not
644 : : * been populated from its query. No harm is done because there is no data
645 : : * to deal with, and we don't want to throw an error if this is part of a
646 : : * multi-relation request -- for example, CLUSTER was run on the entire
647 : : * database.
648 : : */
649 [ + + ]: 415 : if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
650 [ + - ]: 8 : !RelationIsPopulated(OldHeap))
651 : : {
652 [ + - ]: 8 : if (index)
653 : 8 : index_close(index, lmode);
654 : 8 : relation_close(OldHeap, lmode);
655 : 8 : goto out;
656 : : }
657 : :
658 : : Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
659 : : OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
660 : : OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
661 : :
662 : : /*
663 : : * All predicate locks on the tuples or pages are about to be made
664 : : * invalid, because we move tuples around. Promote them to relation
665 : : * locks. Predicate locks on indexes will be promoted when they are
666 : : * reindexed.
667 : : *
668 : : * During concurrent processing, the heap as well as its indexes stay in
669 : : * operation, so we postpone this step until they are locked using
670 : : * AccessExclusiveLock near the end of the processing.
671 : : */
672 [ + + ]: 407 : if (!concurrent)
673 : 400 : TransferPredicateLocksToHeapRelation(OldHeap);
674 : :
675 : : /*
676 : : * rebuild_relation does all the dirty work, and closes OldHeap and index,
677 : : * if valid.
678 : : *
679 : : * In concurrent mode, make sure the worker terminates; normally it does
680 : : * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
681 : : * happens even in case this backend dies early on a FATAL exit. Normal
682 : : * mode doesn't need that overhead.
683 : : */
684 [ + + ]: 407 : if (concurrent)
685 : : {
686 [ + - ]: 7 : PG_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
687 : : {
688 : 7 : rebuild_relation(OldHeap, index, verbose, ident_idx);
689 : : }
690 [ - + ]: 7 : PG_END_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
691 : 7 : stop_repack_decoding_worker();
692 : : }
693 : : else
694 : 400 : rebuild_relation(OldHeap, index, verbose, ident_idx);
695 : :
696 : 411 : out:
697 : : /* Roll back any GUC changes executed by index functions */
698 : 411 : AtEOXact_GUC(false, save_nestlevel);
699 : :
700 : : /* Restore userid and security context */
701 : 411 : SetUserIdAndSecContext(save_userid, save_sec_context);
702 : :
703 : 411 : pgstat_progress_end_command();
704 : 411 : }
705 : :
706 : : /*
707 : : * Check if the table (and its index) still meets the requirements of
708 : : * cluster_rel().
709 : : */
710 : : static bool
711 : 52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
712 : : Oid userid, LOCKMODE lmode, int options)
713 : : {
714 : 52 : Oid tableOid = RelationGetRelid(OldHeap);
715 : :
716 : : /* Check that the user still has privileges for the relation */
717 [ - + ]: 52 : if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
718 : : {
719 : 0 : relation_close(OldHeap, lmode);
720 : 0 : return false;
721 : : }
722 : :
723 : : /*
724 : : * Silently skip a temp table for a remote session. Only doing this check
725 : : * in the "recheck" case is appropriate (which currently means somebody is
726 : : * executing a database-wide CLUSTER or on a partitioned table), because
727 : : * there is another check in cluster() which will stop any attempt to
728 : : * cluster remote temp tables by name. There is another check in
729 : : * cluster_rel which is redundant, but we leave it for extra safety.
730 : : */
731 [ - + - - ]: 52 : if (RELATION_IS_OTHER_TEMP(OldHeap))
732 : : {
733 : 0 : relation_close(OldHeap, lmode);
734 : 0 : return false;
735 : : }
736 : :
737 [ + + ]: 52 : if (OidIsValid(indexOid))
738 : : {
739 : : /*
740 : : * Check that the index still exists
741 : : */
742 [ - + ]: 32 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
743 : : {
744 : 0 : relation_close(OldHeap, lmode);
745 : 0 : return false;
746 : : }
747 : :
748 : : /*
749 : : * Check that the index is still the one with indisclustered set, if
750 : : * needed.
751 : : */
752 [ + + ]: 32 : if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
753 [ - + ]: 4 : !get_index_isclustered(indexOid))
754 : : {
755 : 0 : relation_close(OldHeap, lmode);
756 : 0 : return false;
757 : : }
758 : : }
759 : :
760 : 52 : return true;
761 : : }
762 : :
763 : : /*
764 : : * Verify that the specified heap and index are valid to cluster on
765 : : *
766 : : * Side effect: obtains lock on the index. The caller may
767 : : * in some cases already have a lock of the same strength on the table, but
768 : : * not in all cases so we can't rely on the table-level lock for
769 : : * protection here.
770 : : */
771 : : void
772 : 353 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
773 : : {
774 : : Relation OldIndex;
775 : :
776 : 353 : OldIndex = index_open(indexOid, lockmode);
777 : :
778 : : /*
779 : : * Check that index is in fact an index on the given relation
780 : : */
781 [ + - ]: 353 : if (OldIndex->rd_index == NULL ||
782 [ + + ]: 353 : OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
783 [ + - ]: 4 : ereport(ERROR,
784 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
785 : : errmsg("\"%s\" is not an index for table \"%s\"",
786 : : RelationGetRelationName(OldIndex),
787 : : RelationGetRelationName(OldHeap))));
788 : :
789 : : /* Index AM must allow clustering */
790 [ + + ]: 349 : if (!OldIndex->rd_indam->amclusterable)
791 [ + - ]: 4 : ereport(ERROR,
792 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
793 : : errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
794 : : RelationGetRelationName(OldIndex))));
795 : :
796 : : /*
797 : : * Disallow clustering on incomplete indexes (those that might not index
798 : : * every row of the relation). We could relax this by making a separate
799 : : * seqscan pass over the table to copy the missing rows, but that seems
800 : : * expensive and tedious.
801 : : */
802 [ + + ]: 345 : if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
803 [ + - ]: 4 : ereport(ERROR,
804 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
805 : : errmsg("cannot cluster on partial index \"%s\"",
806 : : RelationGetRelationName(OldIndex))));
807 : :
808 : : /*
809 : : * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
810 : : * it might well not contain entries for every heap row, or might not even
811 : : * be internally consistent. (But note that we don't check indcheckxmin;
812 : : * the worst consequence of following broken HOT chains would be that we
813 : : * might put recently-dead tuples out-of-order in the new table, and there
814 : : * is little harm in that.)
815 : : */
816 [ + + ]: 341 : if (!OldIndex->rd_index->indisvalid)
817 [ + - ]: 4 : ereport(ERROR,
818 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
819 : : errmsg("cannot cluster on invalid index \"%s\"",
820 : : RelationGetRelationName(OldIndex))));
821 : :
822 : : /* Drop relcache refcnt on OldIndex, but keep lock */
823 : 337 : index_close(OldIndex, NoLock);
824 : 337 : }
825 : :
826 : : /*
827 : : * mark_index_clustered: mark the specified index as the one clustered on
828 : : *
829 : : * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
830 : : */
831 : : void
832 : 192 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
833 : : {
834 : : HeapTuple indexTuple;
835 : : Form_pg_index indexForm;
836 : : Relation pg_index;
837 : : ListCell *index;
838 : :
839 : : Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
840 : :
841 : : /*
842 : : * If the index is already marked clustered, no need to do anything.
843 : : */
844 [ + + ]: 192 : if (OidIsValid(indexOid))
845 : : {
846 [ + + ]: 184 : if (get_index_isclustered(indexOid))
847 : 37 : return;
848 : : }
849 : :
850 : : /*
851 : : * Check each index of the relation and set/clear the bit as needed.
852 : : */
853 : 155 : pg_index = table_open(IndexRelationId, RowExclusiveLock);
854 : :
855 [ + - + + : 464 : foreach(index, RelationGetIndexList(rel))
+ + ]
856 : : {
857 : 309 : Oid thisIndexOid = lfirst_oid(index);
858 : :
859 : 309 : indexTuple = SearchSysCacheCopy1(INDEXRELID,
860 : : ObjectIdGetDatum(thisIndexOid));
861 [ - + ]: 309 : if (!HeapTupleIsValid(indexTuple))
862 [ # # ]: 0 : elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
863 : 309 : indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
864 : :
865 : : /*
866 : : * Unset the bit if set. We know it's wrong because we checked this
867 : : * earlier.
868 : : */
869 [ + + ]: 309 : if (indexForm->indisclustered)
870 : : {
871 : 20 : indexForm->indisclustered = false;
872 : 20 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
873 : : }
874 [ + + ]: 289 : else if (thisIndexOid == indexOid)
875 : : {
876 : : /* this was checked earlier, but let's be real sure */
877 [ - + ]: 147 : if (!indexForm->indisvalid)
878 [ # # ]: 0 : elog(ERROR, "cannot cluster on invalid index %u", indexOid);
879 : 147 : indexForm->indisclustered = true;
880 : 147 : CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
881 : : }
882 : :
883 [ - + ]: 309 : InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
884 : : InvalidOid, is_internal);
885 : :
886 : 309 : heap_freetuple(indexTuple);
887 : : }
888 : :
889 : 155 : table_close(pg_index, RowExclusiveLock);
890 : : }
891 : :
892 : : /*
893 : : * Check if the CONCURRENTLY option is legal for the relation.
894 : : *
895 : : * *Ident_idx_p receives OID of the identity index.
896 : : */
897 : : static void
898 : 14 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
899 : : {
900 : : char relpersistence,
901 : : replident;
902 : : Oid ident_idx;
903 : :
904 [ - + ]: 14 : if (wal_level < WAL_LEVEL_REPLICA)
905 [ # # ]: 0 : ereport(ERROR,
906 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
907 : : errmsg("cannot execute %s in this configuration",
908 : : "REPACK (CONCURRENTLY)"),
909 : : errdetail("%s requires \"wal_level\" to be set to \"replica\" or higher.",
910 : : "REPACK (CONCURRENTLY)"));
911 : :
912 : : /* Data changes in system relations are not logically decoded. */
913 [ + + ]: 14 : if (IsCatalogRelation(rel))
914 [ + - ]: 1 : ereport(ERROR,
915 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
916 : : errmsg("cannot execute %s on relation \"%s\"",
917 : : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
918 : : errhint("%s is not supported for catalog relations.",
919 : : "REPACK (CONCURRENTLY)"));
920 : :
921 : : /*
922 : : * reorderbuffer.c does not seem to handle processing of TOAST relation
923 : : * alone.
924 : : */
925 [ + + ]: 13 : if (IsToastRelation(rel))
926 [ + - ]: 1 : ereport(ERROR,
927 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
928 : : errmsg("cannot execute %s on relation \"%s\"",
929 : : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
930 : : errhint("%s is not supported for TOAST relations.",
931 : : "REPACK (CONCURRENTLY)"));
932 : :
933 : 12 : relpersistence = rel->rd_rel->relpersistence;
934 [ + + ]: 12 : if (relpersistence != RELPERSISTENCE_PERMANENT)
935 [ + - ]: 2 : ereport(ERROR,
936 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
937 : : errmsg("cannot execute %s on relation \"%s\"",
938 : : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
939 : : errhint("%s is only allowed for permanent relations.",
940 : : "REPACK (CONCURRENTLY)"));
941 : :
942 : : /*
943 : : * With NOTHING, WAL does not contain the old tuple; FULL is not yet
944 : : * supported.
945 : : */
946 : 10 : replident = rel->rd_rel->relreplident;
947 [ + + - + ]: 10 : if (replident == REPLICA_IDENTITY_NOTHING ||
948 : : replident == REPLICA_IDENTITY_FULL)
949 [ + - + - ]: 1 : ereport(ERROR,
950 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
951 : : errmsg("cannot execute %s on relation \"%s\"",
952 : : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
953 : : errdetail("%s does not support tables with %s.",
954 : : "REPACK (CONCURRENTLY)",
955 : : replident == REPLICA_IDENTITY_NOTHING ?
956 : : "REPLICA IDENTITY NOTHING" : "REPLICA IDENTITY FULL"));
957 : :
958 : : /*
959 : : * Obtain the replica identity index -- either one that has been set
960 : : * explicitly, or a non-deferrable primary key. If none of these cases
961 : : * apply, the table cannot be repacked concurrently. It might be possible
962 : : * to have repack work with a FULL replica identity; however that requires
963 : : * more work and is not implemented yet.
964 : : */
965 : 9 : ident_idx = GetRelationIdentityOrPK(rel);
966 [ + + ]: 9 : if (!OidIsValid(ident_idx))
967 : : {
968 : : /* This special case warrants its own error message */
969 [ + + + - ]: 2 : if (OidIsValid(rel->rd_pkindex) && rel->rd_ispkdeferrable)
970 [ + - ]: 1 : ereport(ERROR,
971 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
972 : : errmsg("cannot execute %s on relation \"%s\"",
973 : : "REPACK (CONCURRENTLY)",
974 : : RelationGetRelationName(rel)),
975 : : errdetail("%s does not support deferrable primary keys.",
976 : : "REPACK (CONCURRENTLY)"),
977 : : errhint("Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity."));
978 : :
979 [ + - ]: 1 : ereport(ERROR,
980 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
981 : : errmsg("cannot execute %s on relation \"%s\"",
982 : : "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
983 : : errhint("Relation \"%s\" has no identity index.",
984 : : RelationGetRelationName(rel)));
985 : : }
986 : :
987 : 7 : *ident_idx_p = ident_idx;
988 : 7 : }
989 : :
990 : :
991 : : /*
992 : : * rebuild_relation: rebuild an existing relation in index or physical order
993 : : *
994 : : * OldHeap: table to rebuild. See cluster_rel() for comments on the required
995 : : * lock strength.
996 : : *
997 : : * index: index to cluster by, or NULL to rewrite in physical order.
998 : : *
999 : : * ident_idx: identity index, to handle replaying of concurrent data changes
1000 : : * to the new heap. InvalidOid if there's no CONCURRENTLY option.
1001 : : *
1002 : : * On entry, heap and index (if one is given) must be open, and the
1003 : : * appropriate lock held on them -- AccessExclusiveLock for exclusive
1004 : : * processing and ShareUpdateExclusiveLock for concurrent processing.
1005 : : *
1006 : : * On exit, they are closed, but still locked with AccessExclusiveLock.
1007 : : * (The function handles the lock upgrade if 'concurrent' is true.)
1008 : : */
1009 : : static void
1010 : 407 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
1011 : : Oid ident_idx)
1012 : : {
1013 : 407 : Oid tableOid = RelationGetRelid(OldHeap);
1014 : 407 : Oid accessMethod = OldHeap->rd_rel->relam;
1015 : 407 : Oid tableSpace = OldHeap->rd_rel->reltablespace;
1016 : : Oid OIDNewHeap;
1017 : : Relation NewHeap;
1018 : : char relpersistence;
1019 : : bool swap_toast_by_content;
1020 : : TransactionId frozenXid;
1021 : : MultiXactId cutoffMulti;
1022 : 407 : bool concurrent = OidIsValid(ident_idx);
1023 : 407 : Snapshot snapshot = NULL;
1024 : : #if USE_ASSERT_CHECKING
1025 : : LOCKMODE lmode;
1026 : :
1027 : : lmode = RepackLockLevel(concurrent);
1028 : :
1029 : : Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
1030 : : Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
1031 : : #endif
1032 : :
1033 [ + + ]: 407 : if (concurrent)
1034 : : {
1035 : : /*
1036 : : * The worker needs to be member of the locking group we're the leader
1037 : : * of. We ought to become the leader before the worker starts. The
1038 : : * worker will join the group as soon as it starts.
1039 : : *
1040 : : * This is to make sure that the deadlock described below is
1041 : : * detectable by deadlock.c: if the worker waits for a transaction to
1042 : : * complete and we are waiting for the worker output, then effectively
1043 : : * we (i.e. this backend) are waiting for that transaction.
1044 : : */
1045 : 7 : BecomeLockGroupLeader();
1046 : :
1047 : : /*
1048 : : * Start the worker that decodes data changes applied while we're
1049 : : * copying the table contents.
1050 : : *
1051 : : * Note that the worker has to wait for all transactions with XID
1052 : : * already assigned to finish. If some of those transactions is
1053 : : * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1054 : : * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1055 : : * Not sure this risk is worth unlocking/locking the table (and its
1056 : : * clustering index) and checking again if it's still eligible for
1057 : : * REPACK CONCURRENTLY.
1058 : : */
1059 : 7 : start_repack_decoding_worker(tableOid);
1060 : :
1061 : : /*
1062 : : * Wait until the worker has the initial snapshot and retrieve it.
1063 : : */
1064 : 7 : snapshot = get_initial_snapshot(decoding_worker);
1065 : :
1066 : 7 : PushActiveSnapshot(snapshot);
1067 : : }
1068 : :
1069 : : /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1070 [ + + ]: 407 : if (index != NULL)
1071 : 145 : mark_index_clustered(OldHeap, RelationGetRelid(index), true);
1072 : :
1073 : : /* Remember info about rel before closing OldHeap */
1074 : 407 : relpersistence = OldHeap->rd_rel->relpersistence;
1075 : :
1076 : : /*
1077 : : * Create the transient table that will receive the re-ordered data.
1078 : : *
1079 : : * OldHeap is already locked, so no need to lock it again. make_new_heap
1080 : : * obtains AccessExclusiveLock on the new heap and its toast table.
1081 : : */
1082 : 407 : OIDNewHeap = make_new_heap(tableOid, tableSpace,
1083 : : accessMethod,
1084 : : relpersistence,
1085 : : NoLock);
1086 : : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
1087 : 407 : NewHeap = table_open(OIDNewHeap, NoLock);
1088 : :
1089 : : /*
1090 : : * In concurrent mode, create a copy of the attribute defaults on the temp
1091 : : * table, which the executor needs when replaying concurrent data changes.
1092 : : */
1093 [ + + ]: 407 : if (concurrent)
1094 : 7 : copy_attribute_defaults(tableOid, OIDNewHeap);
1095 : :
1096 : : /* Copy the heap data into the new table in the desired order */
1097 : 407 : copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
1098 : : &swap_toast_by_content, &frozenXid, &cutoffMulti);
1099 : :
1100 : : /* The historic snapshot won't be needed anymore. */
1101 [ + + ]: 407 : if (snapshot)
1102 : : {
1103 : 7 : PopActiveSnapshot();
1104 : 7 : UpdateActiveSnapshotCommandId();
1105 : : }
1106 : :
1107 [ + + ]: 407 : if (concurrent)
1108 : : {
1109 : : Assert(!swap_toast_by_content);
1110 : :
1111 : : /*
1112 : : * Close the index, but keep the lock. Both heaps will be closed by
1113 : : * the following call.
1114 : : */
1115 [ + + ]: 7 : if (index)
1116 : 3 : index_close(index, NoLock);
1117 : :
1118 : 7 : rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
1119 : : frozenXid, cutoffMulti);
1120 : :
1121 : 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1122 : : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
1123 : : }
1124 : : else
1125 : : {
1126 : 400 : bool is_system_catalog = IsSystemRelation(OldHeap);
1127 : :
1128 : : /* Close relcache entries, but keep lock until transaction commit */
1129 : 400 : table_close(OldHeap, NoLock);
1130 [ + + ]: 400 : if (index)
1131 : 142 : index_close(index, NoLock);
1132 : :
1133 : : /*
1134 : : * Close the new relation so it can be dropped as soon as the storage
1135 : : * is swapped. The relation is not visible to others, so no need to
1136 : : * unlock it explicitly.
1137 : : */
1138 : 400 : table_close(NewHeap, NoLock);
1139 : :
1140 : : /*
1141 : : * Swap the physical files of the target and transient tables, then
1142 : : * rebuild the target's indexes and throw away the transient table.
1143 : : */
1144 : 400 : finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
1145 : : swap_toast_by_content, false, true,
1146 : : true, /* reindex */
1147 : : frozenXid, cutoffMulti,
1148 : : relpersistence);
1149 : : }
1150 : 403 : }
1151 : :
1152 : :
1153 : : /*
1154 : : * Create the transient table that will be filled with new data during
1155 : : * CLUSTER, ALTER TABLE, and similar operations. The transient table
1156 : : * duplicates the logical structure of the OldHeap; but will have the
1157 : : * specified physical storage properties NewTableSpace, NewAccessMethod, and
1158 : : * relpersistence.
1159 : : *
1160 : : * After this, the caller should load the new heap with transferred/modified
1161 : : * data, then call finish_heap_swap to complete the operation.
1162 : : */
1163 : : Oid
1164 : 1556 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
1165 : : char relpersistence, LOCKMODE lockmode)
1166 : : {
1167 : : TupleDesc OldHeapDesc;
1168 : : char NewHeapName[NAMEDATALEN];
1169 : : Oid OIDNewHeap;
1170 : : Oid toastid;
1171 : : Relation OldHeap;
1172 : : HeapTuple tuple;
1173 : : Datum reloptions;
1174 : : bool isNull;
1175 : : Oid namespaceid;
1176 : :
1177 : 1556 : OldHeap = table_open(OIDOldHeap, lockmode);
1178 : 1556 : OldHeapDesc = RelationGetDescr(OldHeap);
1179 : :
1180 : : /*
1181 : : * Note that the NewHeap will not receive any of the defaults or
1182 : : * constraints associated with the OldHeap; we don't need 'em, and there's
1183 : : * no reason to spend cycles inserting them into the catalogs only to
1184 : : * delete them.
1185 : : */
1186 : :
1187 : : /*
1188 : : * But we do want to use reloptions of the old heap for new heap.
1189 : : */
1190 : 1556 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
1191 [ - + ]: 1556 : if (!HeapTupleIsValid(tuple))
1192 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1193 : 1556 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1194 : : &isNull);
1195 [ + + ]: 1556 : if (isNull)
1196 : 1463 : reloptions = (Datum) 0;
1197 : :
1198 [ + + ]: 1556 : if (relpersistence == RELPERSISTENCE_TEMP)
1199 : 98 : namespaceid = LookupCreationNamespace("pg_temp");
1200 : : else
1201 : 1458 : namespaceid = RelationGetNamespace(OldHeap);
1202 : :
1203 : : /*
1204 : : * Create the new heap, using a temporary name in the same namespace as
1205 : : * the existing table. NOTE: there is some risk of collision with user
1206 : : * relnames. Working around this seems more trouble than it's worth; in
1207 : : * particular, we can't create the new heap in a different namespace from
1208 : : * the old, or we will have problems with the TEMP status of temp tables.
1209 : : *
1210 : : * Note: the new heap is not a shared relation, even if we are rebuilding
1211 : : * a shared rel. However, we do make the new heap mapped if the source is
1212 : : * mapped. This simplifies swap_relation_files, and is absolutely
1213 : : * necessary for rebuilding pg_class, for reasons explained there.
1214 : : */
1215 : 1556 : snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1216 : :
1217 : 1556 : OIDNewHeap = heap_create_with_catalog(NewHeapName,
1218 : : namespaceid,
1219 : : NewTableSpace,
1220 : : InvalidOid,
1221 : : InvalidOid,
1222 : : InvalidOid,
1223 : 1556 : OldHeap->rd_rel->relowner,
1224 : : NewAccessMethod,
1225 : : OldHeapDesc,
1226 : : NIL,
1227 : : RELKIND_RELATION,
1228 : : relpersistence,
1229 : : false,
1230 [ + + + - : 1556 : RelationIsMapped(OldHeap),
+ - + + +
- + + ]
1231 : : ONCOMMIT_NOOP,
1232 : : reloptions,
1233 : : false,
1234 : : true,
1235 : : true,
1236 : : OIDOldHeap,
1237 : 1556 : NULL);
1238 : : Assert(OIDNewHeap != InvalidOid);
1239 : :
1240 : 1556 : ReleaseSysCache(tuple);
1241 : :
1242 : : /*
1243 : : * Advance command counter so that the newly-created relation's catalog
1244 : : * tuples will be visible to table_open.
1245 : : */
1246 : 1556 : CommandCounterIncrement();
1247 : :
1248 : : /*
1249 : : * If necessary, create a TOAST table for the new relation.
1250 : : *
1251 : : * If the relation doesn't have a TOAST table already, we can't need one
1252 : : * for the new relation. The other way around is possible though: if some
1253 : : * wide columns have been dropped, NewHeapCreateToastTable can decide that
1254 : : * no TOAST table is needed for the new table.
1255 : : *
1256 : : * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1257 : : * that the TOAST table will be visible for insertion.
1258 : : */
1259 : 1556 : toastid = OldHeap->rd_rel->reltoastrelid;
1260 [ + + ]: 1556 : if (OidIsValid(toastid))
1261 : : {
1262 : : /* keep the existing toast table's reloptions, if any */
1263 : 571 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
1264 [ - + ]: 571 : if (!HeapTupleIsValid(tuple))
1265 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u", toastid);
1266 : 571 : reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1267 : : &isNull);
1268 [ + - ]: 571 : if (isNull)
1269 : 571 : reloptions = (Datum) 0;
1270 : :
1271 : 571 : NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1272 : :
1273 : 571 : ReleaseSysCache(tuple);
1274 : : }
1275 : :
1276 : 1556 : table_close(OldHeap, NoLock);
1277 : :
1278 : 1556 : return OIDNewHeap;
1279 : : }
1280 : :
1281 : : /*
1282 : : * Do the physical copying of table data.
1283 : : *
1284 : : * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1285 : : * iff concurrent processing is required.
1286 : : *
1287 : : * There are three output parameters:
1288 : : * *pSwapToastByContent is set true if toast tables must be swapped by content.
1289 : : * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1290 : : * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1291 : : */
1292 : : static void
1293 : 407 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
1294 : : Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1295 : : TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
1296 : : {
1297 : : Relation relRelation;
1298 : : HeapTuple reltup;
1299 : : Form_pg_class relform;
1300 : : TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
1301 : : TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY;
1302 : : VacuumParams params;
1303 : : struct VacuumCutoffs cutoffs;
1304 : : bool use_sort;
1305 : 407 : double num_tuples = 0,
1306 : 407 : tups_vacuumed = 0,
1307 : 407 : tups_recently_dead = 0;
1308 : : BlockNumber num_pages;
1309 [ + + ]: 407 : int elevel = verbose ? INFO : DEBUG2;
1310 : : PGRUsage ru0;
1311 : : char *nspname;
1312 : 407 : bool concurrent = snapshot != NULL;
1313 : : LOCKMODE lmode;
1314 : :
1315 : 407 : lmode = RepackLockLevel(concurrent);
1316 : :
1317 : 407 : pg_rusage_init(&ru0);
1318 : :
1319 : : /* Store a copy of the namespace name for logging purposes */
1320 : 407 : nspname = get_namespace_name(RelationGetNamespace(OldHeap));
1321 : :
1322 : : /*
1323 : : * Their tuple descriptors should be exactly alike, but here we only need
1324 : : * assume that they have the same number of columns.
1325 : : */
1326 : 407 : oldTupDesc = RelationGetDescr(OldHeap);
1327 : 407 : newTupDesc = RelationGetDescr(NewHeap);
1328 : : Assert(newTupDesc->natts == oldTupDesc->natts);
1329 : :
1330 : : /*
1331 : : * If the OldHeap has a toast table, get lock on the toast table to keep
1332 : : * it from being vacuumed. This is needed because autovacuum processes
1333 : : * toast tables independently of their main tables, with no lock on the
1334 : : * latter. If an autovacuum were to start on the toast table after we
1335 : : * compute our OldestXmin below, it would use a later OldestXmin, and then
1336 : : * possibly remove as DEAD toast tuples belonging to main tuples we think
1337 : : * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1338 : : * tuples.
1339 : : *
1340 : : * We don't need to open the toast relation here, just lock it. The lock
1341 : : * will be held till end of transaction.
1342 : : */
1343 [ + + ]: 407 : if (OldHeap->rd_rel->reltoastrelid)
1344 : 135 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1345 : :
1346 : : /*
1347 : : * If both tables have TOAST tables, perform toast swap by content. It is
1348 : : * possible that the old table has a toast table but the new one doesn't,
1349 : : * if toastable columns have been dropped. In that case we have to do
1350 : : * swap by links. This is okay because swap by content is only essential
1351 : : * for system catalogs, and we don't support schema changes for them.
1352 : : */
1353 [ + + + - ]: 407 : if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1354 [ + + ]: 135 : !concurrent)
1355 : : {
1356 : 132 : *pSwapToastByContent = true;
1357 : :
1358 : : /*
1359 : : * When doing swap by content, any toast pointers written into NewHeap
1360 : : * must use the old toast table's OID, because that's where the toast
1361 : : * data will eventually be found. Set this up by setting rd_toastoid.
1362 : : * This also tells toast_save_datum() to preserve the toast value
1363 : : * OIDs, which we want so as not to invalidate toast pointers in
1364 : : * system catalog caches, and to avoid making multiple copies of a
1365 : : * single toast value.
1366 : : *
1367 : : * Note that we must hold NewHeap open until we are done writing data,
1368 : : * since the relcache will not guarantee to remember this setting once
1369 : : * the relation is closed. Also, this technique depends on the fact
1370 : : * that no one will try to read from the NewHeap until after we've
1371 : : * finished writing it and swapping the rels --- otherwise they could
1372 : : * follow the toast pointers to the wrong place. (It would actually
1373 : : * work for values copied over from the old toast table, but not for
1374 : : * any values that we toast which were previously not toasted.)
1375 : : *
1376 : : * This would not work with CONCURRENTLY because we may need to delete
1377 : : * TOASTed tuples from the new heap. With this hack, we'd delete them
1378 : : * from the old heap.
1379 : : */
1380 : 132 : NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1381 : : }
1382 : : else
1383 : 275 : *pSwapToastByContent = false;
1384 : :
1385 : : /*
1386 : : * Compute xids used to freeze and weed out dead tuples and multixacts.
1387 : : * Since we're going to rewrite the whole table anyway, there's no reason
1388 : : * not to be aggressive about this.
1389 : : */
1390 : 407 : memset(¶ms, 0, sizeof(VacuumParams));
1391 : 407 : vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs);
1392 : :
1393 : : /*
1394 : : * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1395 : : * backwards, so take the max.
1396 : : */
1397 : : {
1398 : 407 : TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1399 : :
1400 [ + - + + ]: 814 : if (TransactionIdIsValid(relfrozenxid) &&
1401 : 407 : TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
1402 : 14 : cutoffs.FreezeLimit = relfrozenxid;
1403 : : }
1404 : :
1405 : : /*
1406 : : * MultiXactCutoff, similarly, shouldn't go backwards either.
1407 : : */
1408 : : {
1409 : 407 : MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1410 : :
1411 [ + - - + ]: 814 : if (MultiXactIdIsValid(relminmxid) &&
1412 : 407 : MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
1413 : 0 : cutoffs.MultiXactCutoff = relminmxid;
1414 : : }
1415 : :
1416 : : /*
1417 : : * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1418 : : * the OldHeap. We know how to use a sort to duplicate the ordering of a
1419 : : * btree index, and will use seqscan-and-sort for that case if the planner
1420 : : * tells us it's cheaper. Otherwise, always indexscan if an index is
1421 : : * provided, else plain seqscan.
1422 : : */
1423 [ + + + + ]: 407 : if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1424 : 143 : use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
1425 : : RelationGetRelid(OldIndex));
1426 : : else
1427 : 264 : use_sort = false;
1428 : :
1429 : : /* Log what we're doing */
1430 [ + + + + ]: 407 : if (OldIndex != NULL && !use_sort)
1431 [ - + ]: 62 : ereport(elevel,
1432 : : errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1433 : : nspname,
1434 : : RelationGetRelationName(OldHeap),
1435 : : RelationGetRelationName(OldIndex)));
1436 [ + + ]: 345 : else if (use_sort)
1437 [ - + ]: 83 : ereport(elevel,
1438 : : errmsg("repacking \"%s.%s\" using sequential scan and sort",
1439 : : nspname,
1440 : : RelationGetRelationName(OldHeap)));
1441 : : else
1442 [ + + ]: 262 : ereport(elevel,
1443 : : errmsg("repacking \"%s.%s\" in physical order",
1444 : : nspname,
1445 : : RelationGetRelationName(OldHeap)));
1446 : :
1447 : : /*
1448 : : * Hand off the actual copying to AM specific function, the generic code
1449 : : * cannot know how to deal with visibility across AMs. Note that this
1450 : : * routine is allowed to set FreezeXid / MultiXactCutoff to different
1451 : : * values (e.g. because the AM doesn't use freezing).
1452 : : */
1453 : 407 : table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
1454 : : cutoffs.OldestXmin, snapshot,
1455 : : &cutoffs.FreezeLimit,
1456 : : &cutoffs.MultiXactCutoff,
1457 : : &num_tuples, &tups_vacuumed,
1458 : : &tups_recently_dead);
1459 : :
1460 : : /* return selected values to caller, get set as relfrozenxid/minmxid */
1461 : 407 : *pFreezeXid = cutoffs.FreezeLimit;
1462 : 407 : *pCutoffMulti = cutoffs.MultiXactCutoff;
1463 : :
1464 : : /*
1465 : : * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1466 : : * In the CONCURRENTLY case, we need to set it again before applying the
1467 : : * concurrent changes.
1468 : : */
1469 : 407 : NewHeap->rd_toastoid = InvalidOid;
1470 : :
1471 : 407 : num_pages = RelationGetNumberOfBlocks(NewHeap);
1472 : :
1473 : : /* Log what we did */
1474 [ + + ]: 407 : ereport(elevel,
1475 : : (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1476 : : nspname,
1477 : : RelationGetRelationName(OldHeap),
1478 : : tups_vacuumed, num_tuples,
1479 : : RelationGetNumberOfBlocks(OldHeap)),
1480 : : errdetail("%.0f dead row versions cannot be removed yet.\n"
1481 : : "%s.",
1482 : : tups_recently_dead,
1483 : : pg_rusage_show(&ru0))));
1484 : :
1485 : : /* Update pg_class to reflect the correct values of pages and tuples. */
1486 : 407 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1487 : :
1488 : 407 : reltup = SearchSysCacheCopy1(RELOID,
1489 : : ObjectIdGetDatum(RelationGetRelid(NewHeap)));
1490 [ - + ]: 407 : if (!HeapTupleIsValid(reltup))
1491 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u",
1492 : : RelationGetRelid(NewHeap));
1493 : 407 : relform = (Form_pg_class) GETSTRUCT(reltup);
1494 : :
1495 : 407 : relform->relpages = num_pages;
1496 : 407 : relform->reltuples = num_tuples;
1497 : :
1498 : : /* Don't update the stats for pg_class. See swap_relation_files. */
1499 [ + + ]: 407 : if (RelationGetRelid(OldHeap) != RelationRelationId)
1500 : 384 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
1501 : : else
1502 : 23 : CacheInvalidateRelcacheByTuple(reltup);
1503 : :
1504 : : /* Clean up. */
1505 : 407 : heap_freetuple(reltup);
1506 : 407 : table_close(relRelation, RowExclusiveLock);
1507 : :
1508 : : /* Make the update visible */
1509 : 407 : CommandCounterIncrement();
1510 : 407 : }
1511 : :
1512 : : /*
1513 : : * Swap the physical files of two given relations.
1514 : : *
1515 : : * We swap the physical identity (reltablespace, relfilenumber) while keeping
1516 : : * the same logical identities of the two relations. relpersistence is also
1517 : : * swapped, which is critical since it determines where buffers live for each
1518 : : * relation.
1519 : : *
1520 : : * We can swap associated TOAST data in either of two ways: recursively swap
1521 : : * the physical content of the toast tables (and their indexes), or swap the
1522 : : * TOAST links in the given relations' pg_class entries. The former is needed
1523 : : * to manage rewrites of shared catalogs (where we cannot change the pg_class
1524 : : * links) while the latter is the only way to handle cases in which a toast
1525 : : * table is added or removed altogether.
1526 : : *
1527 : : * Additionally, the first relation is marked with relfrozenxid set to
1528 : : * frozenXid. It seems a bit ugly to have this here, but the caller would
1529 : : * have to do it anyway, so having it here saves a heap_update. Note: in
1530 : : * the swap-toast-links case, we assume we don't need to change the toast
1531 : : * table's relfrozenxid: the new version of the toast table should already
1532 : : * have relfrozenxid set to RecentXmin, which is good enough.
1533 : : *
1534 : : * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1535 : : * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1536 : : * having to look the information up again later in finish_heap_swap.
1537 : : */
1538 : : static void
1539 : 1707 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1540 : : bool swap_toast_by_content,
1541 : : bool is_internal,
1542 : : TransactionId frozenXid,
1543 : : MultiXactId cutoffMulti,
1544 : : Oid *mapped_tables)
1545 : : {
1546 : : Relation relRelation;
1547 : : HeapTuple reltup1,
1548 : : reltup2;
1549 : : Form_pg_class relform1,
1550 : : relform2;
1551 : : RelFileNumber relfilenumber1,
1552 : : relfilenumber2;
1553 : : RelFileNumber swaptemp;
1554 : : char swptmpchr;
1555 : : Oid relam1,
1556 : : relam2;
1557 : :
1558 : : /* We need writable copies of both pg_class tuples. */
1559 : 1707 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
1560 : :
1561 : 1707 : reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1562 [ - + ]: 1707 : if (!HeapTupleIsValid(reltup1))
1563 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u", r1);
1564 : 1707 : relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1565 : :
1566 : 1707 : reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1567 [ - + ]: 1707 : if (!HeapTupleIsValid(reltup2))
1568 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u", r2);
1569 : 1707 : relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1570 : :
1571 : 1707 : relfilenumber1 = relform1->relfilenode;
1572 : 1707 : relfilenumber2 = relform2->relfilenode;
1573 : 1707 : relam1 = relform1->relam;
1574 : 1707 : relam2 = relform2->relam;
1575 : :
1576 [ + + + - ]: 1707 : if (RelFileNumberIsValid(relfilenumber1) &&
1577 : : RelFileNumberIsValid(relfilenumber2))
1578 : : {
1579 : : /*
1580 : : * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1581 : : * relpersistence
1582 : : */
1583 : : Assert(!target_is_pg_class);
1584 : :
1585 : 1618 : swaptemp = relform1->relfilenode;
1586 : 1618 : relform1->relfilenode = relform2->relfilenode;
1587 : 1618 : relform2->relfilenode = swaptemp;
1588 : :
1589 : 1618 : swaptemp = relform1->reltablespace;
1590 : 1618 : relform1->reltablespace = relform2->reltablespace;
1591 : 1618 : relform2->reltablespace = swaptemp;
1592 : :
1593 : 1618 : swaptemp = relform1->relam;
1594 : 1618 : relform1->relam = relform2->relam;
1595 : 1618 : relform2->relam = swaptemp;
1596 : :
1597 : 1618 : swptmpchr = relform1->relpersistence;
1598 : 1618 : relform1->relpersistence = relform2->relpersistence;
1599 : 1618 : relform2->relpersistence = swptmpchr;
1600 : :
1601 : : /* Also swap toast links, if we're swapping by links */
1602 [ + + ]: 1618 : if (!swap_toast_by_content)
1603 : : {
1604 : 1282 : swaptemp = relform1->reltoastrelid;
1605 : 1282 : relform1->reltoastrelid = relform2->reltoastrelid;
1606 : 1282 : relform2->reltoastrelid = swaptemp;
1607 : : }
1608 : : }
1609 : : else
1610 : : {
1611 : : /*
1612 : : * Mapped-relation case. Here we have to swap the relation mappings
1613 : : * instead of modifying the pg_class columns. Both must be mapped.
1614 : : */
1615 [ + - - + ]: 89 : if (RelFileNumberIsValid(relfilenumber1) ||
1616 : : RelFileNumberIsValid(relfilenumber2))
1617 [ # # ]: 0 : elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1618 : : NameStr(relform1->relname));
1619 : :
1620 : : /*
1621 : : * We can't change the tablespace nor persistence of a mapped rel, and
1622 : : * we can't handle toast link swapping for one either, because we must
1623 : : * not apply any critical changes to its pg_class row. These cases
1624 : : * should be prevented by upstream permissions tests, so these checks
1625 : : * are non-user-facing emergency backstop.
1626 : : */
1627 [ - + ]: 89 : if (relform1->reltablespace != relform2->reltablespace)
1628 [ # # ]: 0 : elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1629 : : NameStr(relform1->relname));
1630 [ - + ]: 89 : if (relform1->relpersistence != relform2->relpersistence)
1631 [ # # ]: 0 : elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1632 : : NameStr(relform1->relname));
1633 [ - + ]: 89 : if (relform1->relam != relform2->relam)
1634 [ # # ]: 0 : elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1635 : : NameStr(relform1->relname));
1636 [ + + ]: 89 : if (!swap_toast_by_content &&
1637 [ + - - + ]: 29 : (relform1->reltoastrelid || relform2->reltoastrelid))
1638 [ # # ]: 0 : elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1639 : : NameStr(relform1->relname));
1640 : :
1641 : : /*
1642 : : * Fetch the mappings --- shouldn't fail, but be paranoid
1643 : : */
1644 : 89 : relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
1645 [ - + ]: 89 : if (!RelFileNumberIsValid(relfilenumber1))
1646 [ # # ]: 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1647 : : NameStr(relform1->relname), r1);
1648 : 89 : relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
1649 [ - + ]: 89 : if (!RelFileNumberIsValid(relfilenumber2))
1650 [ # # ]: 0 : elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1651 : : NameStr(relform2->relname), r2);
1652 : :
1653 : : /*
1654 : : * Send replacement mappings to relmapper. Note these won't actually
1655 : : * take effect until CommandCounterIncrement.
1656 : : */
1657 : 89 : RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1658 : 89 : RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1659 : :
1660 : : /* Pass OIDs of mapped r2 tables back to caller */
1661 : 89 : *mapped_tables++ = r2;
1662 : : }
1663 : :
1664 : : /*
1665 : : * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1666 : : * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1667 : : * new.
1668 : : */
1669 : : {
1670 : : Relation rel1,
1671 : : rel2;
1672 : :
1673 : 1707 : rel1 = relation_open(r1, NoLock);
1674 : 1707 : rel2 = relation_open(r2, NoLock);
1675 : 1707 : rel2->rd_createSubid = rel1->rd_createSubid;
1676 : 1707 : rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1677 : 1707 : rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1678 : 1707 : RelationAssumeNewRelfilelocator(rel1);
1679 : 1707 : relation_close(rel1, NoLock);
1680 : 1707 : relation_close(rel2, NoLock);
1681 : : }
1682 : :
1683 : : /*
1684 : : * In the case of a shared catalog, these next few steps will only affect
1685 : : * our own database's pg_class row; but that's okay, because they are all
1686 : : * noncritical updates. That's also an important fact for the case of a
1687 : : * mapped catalog, because it's possible that we'll commit the map change
1688 : : * and then fail to commit the pg_class update.
1689 : : */
1690 : :
1691 : : /* set rel1's frozen Xid and minimum MultiXid */
1692 [ + + ]: 1707 : if (relform1->relkind != RELKIND_INDEX)
1693 : : {
1694 : : Assert(!TransactionIdIsValid(frozenXid) ||
1695 : : TransactionIdIsNormal(frozenXid));
1696 : 1567 : relform1->relfrozenxid = frozenXid;
1697 : 1567 : relform1->relminmxid = cutoffMulti;
1698 : : }
1699 : :
1700 : : /* swap size statistics too, since new rel has freshly-updated stats */
1701 : : {
1702 : : int32 swap_pages;
1703 : : float4 swap_tuples;
1704 : : int32 swap_allvisible;
1705 : : int32 swap_allfrozen;
1706 : :
1707 : 1707 : swap_pages = relform1->relpages;
1708 : 1707 : relform1->relpages = relform2->relpages;
1709 : 1707 : relform2->relpages = swap_pages;
1710 : :
1711 : 1707 : swap_tuples = relform1->reltuples;
1712 : 1707 : relform1->reltuples = relform2->reltuples;
1713 : 1707 : relform2->reltuples = swap_tuples;
1714 : :
1715 : 1707 : swap_allvisible = relform1->relallvisible;
1716 : 1707 : relform1->relallvisible = relform2->relallvisible;
1717 : 1707 : relform2->relallvisible = swap_allvisible;
1718 : :
1719 : 1707 : swap_allfrozen = relform1->relallfrozen;
1720 : 1707 : relform1->relallfrozen = relform2->relallfrozen;
1721 : 1707 : relform2->relallfrozen = swap_allfrozen;
1722 : : }
1723 : :
1724 : : /*
1725 : : * Update the tuples in pg_class --- unless the target relation of the
1726 : : * swap is pg_class itself. In that case, there is zero point in making
1727 : : * changes because we'd be updating the old data that we're about to throw
1728 : : * away. Because the real work being done here for a mapped relation is
1729 : : * just to change the relation map settings, it's all right to not update
1730 : : * the pg_class rows in this case. The most important changes will instead
1731 : : * performed later, in finish_heap_swap() itself.
1732 : : */
1733 [ + + ]: 1707 : if (!target_is_pg_class)
1734 : : {
1735 : : CatalogIndexState indstate;
1736 : :
1737 : 1684 : indstate = CatalogOpenIndexes(relRelation);
1738 : 1684 : CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
1739 : : indstate);
1740 : 1684 : CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
1741 : : indstate);
1742 : 1684 : CatalogCloseIndexes(indstate);
1743 : : }
1744 : : else
1745 : : {
1746 : : /* no update ... but we do still need relcache inval */
1747 : 23 : CacheInvalidateRelcacheByTuple(reltup1);
1748 : 23 : CacheInvalidateRelcacheByTuple(reltup2);
1749 : : }
1750 : :
1751 : : /*
1752 : : * Now that pg_class has been updated with its relevant information for
1753 : : * the swap, update the dependency of the relations to point to their new
1754 : : * table AM, if it has changed.
1755 : : */
1756 [ + + ]: 1707 : if (relam1 != relam2)
1757 : : {
1758 [ - + ]: 24 : if (changeDependencyFor(RelationRelationId,
1759 : : r1,
1760 : : AccessMethodRelationId,
1761 : : relam1,
1762 : : relam2) != 1)
1763 [ # # ]: 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1764 : : get_namespace_name(get_rel_namespace(r1)),
1765 : : get_rel_name(r1));
1766 [ - + ]: 24 : if (changeDependencyFor(RelationRelationId,
1767 : : r2,
1768 : : AccessMethodRelationId,
1769 : : relam2,
1770 : : relam1) != 1)
1771 [ # # ]: 0 : elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1772 : : get_namespace_name(get_rel_namespace(r2)),
1773 : : get_rel_name(r2));
1774 : : }
1775 : :
1776 : : /*
1777 : : * Post alter hook for modified relations. The change to r2 is always
1778 : : * internal, but r1 depends on the invocation context.
1779 : : */
1780 [ - + ]: 1707 : InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
1781 : : InvalidOid, is_internal);
1782 [ - + ]: 1707 : InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
1783 : : InvalidOid, true);
1784 : :
1785 : : /*
1786 : : * If we have toast tables associated with the relations being swapped,
1787 : : * deal with them too.
1788 : : */
1789 [ + + + + ]: 1707 : if (relform1->reltoastrelid || relform2->reltoastrelid)
1790 : : {
1791 [ + + ]: 542 : if (swap_toast_by_content)
1792 : : {
1793 [ + - + - ]: 132 : if (relform1->reltoastrelid && relform2->reltoastrelid)
1794 : : {
1795 : : /* Recursively swap the contents of the toast tables */
1796 : 132 : swap_relation_files(relform1->reltoastrelid,
1797 : : relform2->reltoastrelid,
1798 : : target_is_pg_class,
1799 : : swap_toast_by_content,
1800 : : is_internal,
1801 : : frozenXid,
1802 : : cutoffMulti,
1803 : : mapped_tables);
1804 : : }
1805 : : else
1806 : : {
1807 : : /* caller messed up */
1808 [ # # ]: 0 : elog(ERROR, "cannot swap toast files by content when there's only one");
1809 : : }
1810 : : }
1811 : : else
1812 : : {
1813 : : /*
1814 : : * We swapped the ownership links, so we need to change dependency
1815 : : * data to match.
1816 : : *
1817 : : * NOTE: it is possible that only one table has a toast table.
1818 : : *
1819 : : * NOTE: at present, a TOAST table's only dependency is the one on
1820 : : * its owning table. If more are ever created, we'd need to use
1821 : : * something more selective than deleteDependencyRecordsFor() to
1822 : : * get rid of just the link we want.
1823 : : */
1824 : : ObjectAddress baseobject,
1825 : : toastobject;
1826 : : long count;
1827 : :
1828 : : /*
1829 : : * We disallow this case for system catalogs, to avoid the
1830 : : * possibility that the catalog we're rebuilding is one of the
1831 : : * ones the dependency changes would change. It's too late to be
1832 : : * making any data changes to the target catalog.
1833 : : */
1834 [ - + ]: 410 : if (IsSystemClass(r1, relform1))
1835 [ # # ]: 0 : elog(ERROR, "cannot swap toast files by links for system catalogs");
1836 : :
1837 : : /* Delete old dependencies */
1838 [ + + ]: 410 : if (relform1->reltoastrelid)
1839 : : {
1840 : 389 : count = deleteDependencyRecordsFor(RelationRelationId,
1841 : : relform1->reltoastrelid,
1842 : : false);
1843 [ - + ]: 389 : if (count != 1)
1844 [ # # ]: 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1845 : : count);
1846 : : }
1847 [ + - ]: 410 : if (relform2->reltoastrelid)
1848 : : {
1849 : 410 : count = deleteDependencyRecordsFor(RelationRelationId,
1850 : : relform2->reltoastrelid,
1851 : : false);
1852 [ - + ]: 410 : if (count != 1)
1853 [ # # ]: 0 : elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1854 : : count);
1855 : : }
1856 : :
1857 : : /* Register new dependencies */
1858 : 410 : baseobject.classId = RelationRelationId;
1859 : 410 : baseobject.objectSubId = 0;
1860 : 410 : toastobject.classId = RelationRelationId;
1861 : 410 : toastobject.objectSubId = 0;
1862 : :
1863 [ + + ]: 410 : if (relform1->reltoastrelid)
1864 : : {
1865 : 389 : baseobject.objectId = r1;
1866 : 389 : toastobject.objectId = relform1->reltoastrelid;
1867 : 389 : recordDependencyOn(&toastobject, &baseobject,
1868 : : DEPENDENCY_INTERNAL);
1869 : : }
1870 : :
1871 [ + - ]: 410 : if (relform2->reltoastrelid)
1872 : : {
1873 : 410 : baseobject.objectId = r2;
1874 : 410 : toastobject.objectId = relform2->reltoastrelid;
1875 : 410 : recordDependencyOn(&toastobject, &baseobject,
1876 : : DEPENDENCY_INTERNAL);
1877 : : }
1878 : : }
1879 : : }
1880 : :
1881 : : /*
1882 : : * If we're swapping two toast tables by content, do the same for their
1883 : : * valid index. The swap can actually be safely done only if the relations
1884 : : * have indexes.
1885 : : */
1886 [ + + ]: 1707 : if (swap_toast_by_content &&
1887 [ + + ]: 396 : relform1->relkind == RELKIND_TOASTVALUE &&
1888 [ + - ]: 132 : relform2->relkind == RELKIND_TOASTVALUE)
1889 : : {
1890 : : Oid toastIndex1,
1891 : : toastIndex2;
1892 : :
1893 : : /* Get valid index for each relation */
1894 : 132 : toastIndex1 = toast_get_valid_index(r1,
1895 : : AccessExclusiveLock);
1896 : 132 : toastIndex2 = toast_get_valid_index(r2,
1897 : : AccessExclusiveLock);
1898 : :
1899 : 132 : swap_relation_files(toastIndex1,
1900 : : toastIndex2,
1901 : : target_is_pg_class,
1902 : : swap_toast_by_content,
1903 : : is_internal,
1904 : : InvalidTransactionId,
1905 : : InvalidMultiXactId,
1906 : : mapped_tables);
1907 : : }
1908 : :
1909 : : /* Clean up. */
1910 : 1707 : heap_freetuple(reltup1);
1911 : 1707 : heap_freetuple(reltup2);
1912 : :
1913 : 1707 : table_close(relRelation, RowExclusiveLock);
1914 : 1707 : }
1915 : :
1916 : : /*
1917 : : * Remove the transient table that was built by make_new_heap, and finish
1918 : : * cleaning up (including rebuilding all indexes on the old heap).
1919 : : */
1920 : : void
1921 : 1435 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1922 : : bool is_system_catalog,
1923 : : bool swap_toast_by_content,
1924 : : bool check_constraints,
1925 : : bool is_internal,
1926 : : bool reindex,
1927 : : TransactionId frozenXid,
1928 : : MultiXactId cutoffMulti,
1929 : : char newrelpersistence)
1930 : : {
1931 : : ObjectAddress object;
1932 : : Oid mapped_tables[4];
1933 : : int i;
1934 : :
1935 : : /* Report that we are now swapping relation files */
1936 : 1435 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1937 : : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
1938 : :
1939 : : /* Zero out possible results from swapped_relation_files */
1940 : 1435 : memset(mapped_tables, 0, sizeof(mapped_tables));
1941 : :
1942 : : /*
1943 : : * Swap the contents of the heap relations (including any toast tables).
1944 : : * Also set old heap's relfrozenxid to frozenXid.
1945 : : */
1946 : 1435 : swap_relation_files(OIDOldHeap, OIDNewHeap,
1947 : : (OIDOldHeap == RelationRelationId),
1948 : : swap_toast_by_content, is_internal,
1949 : : frozenXid, cutoffMulti, mapped_tables);
1950 : :
1951 : : /*
1952 : : * If it's a system catalog, queue a sinval message to flush all catcaches
1953 : : * on the catalog when we reach CommandCounterIncrement.
1954 : : */
1955 [ + + ]: 1435 : if (is_system_catalog)
1956 : 121 : CacheInvalidateCatalog(OIDOldHeap);
1957 : :
1958 [ + + ]: 1435 : if (reindex)
1959 : : {
1960 : : int reindex_flags;
1961 : 1428 : ReindexParams reindex_params = {0};
1962 : :
1963 : : /*
1964 : : * Rebuild each index on the relation (but not the toast table, which
1965 : : * is all-new at this point). It is important to do this before the
1966 : : * DROP step because if we are processing a system catalog that will
1967 : : * be used during DROP, we want to have its indexes available. There
1968 : : * is no advantage to the other order anyway because this is all
1969 : : * transactional, so no chance to reclaim disk space before commit. We
1970 : : * do not need a final CommandCounterIncrement() because
1971 : : * reindex_relation does it.
1972 : : *
1973 : : * Note: because index_build is called via reindex_relation, it will
1974 : : * never set indcheckxmin true for the indexes. This is OK even
1975 : : * though in some sense we are building new indexes rather than
1976 : : * rebuilding existing ones, because the new heap won't contain any
1977 : : * HOT chains at all, let alone broken ones, so it can't be necessary
1978 : : * to set indcheckxmin.
1979 : : */
1980 : 1428 : reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1981 [ + + ]: 1428 : if (check_constraints)
1982 : 1028 : reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1983 : :
1984 : : /*
1985 : : * Ensure that the indexes have the same persistence as the parent
1986 : : * relation.
1987 : : */
1988 [ + + ]: 1428 : if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1989 : 25 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
1990 [ + + ]: 1403 : else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1991 : 1350 : reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
1992 : :
1993 : : /* Report that we are now reindexing relations */
1994 : 1428 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
1995 : : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
1996 : :
1997 : 1428 : reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
1998 : : }
1999 : :
2000 : : /* Report that we are now doing clean up */
2001 : 1423 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2002 : : PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
2003 : :
2004 : : /*
2005 : : * If the relation being rebuilt is pg_class, swap_relation_files()
2006 : : * couldn't update pg_class's own pg_class entry (check comments in
2007 : : * swap_relation_files()), thus relfrozenxid was not updated. That's
2008 : : * annoying because a potential reason for doing a VACUUM FULL is a
2009 : : * imminent or actual anti-wraparound shutdown. So, now that we can
2010 : : * access the new relation using its indices, update relfrozenxid.
2011 : : * pg_class doesn't have a toast relation, so we don't need to update the
2012 : : * corresponding toast relation. Not that there's little point moving all
2013 : : * relfrozenxid updates here since swap_relation_files() needs to write to
2014 : : * pg_class for non-mapped relations anyway.
2015 : : */
2016 [ + + ]: 1423 : if (OIDOldHeap == RelationRelationId)
2017 : : {
2018 : : Relation relRelation;
2019 : : HeapTuple reltup;
2020 : : Form_pg_class relform;
2021 : :
2022 : 23 : relRelation = table_open(RelationRelationId, RowExclusiveLock);
2023 : :
2024 : 23 : reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
2025 [ - + ]: 23 : if (!HeapTupleIsValid(reltup))
2026 [ # # ]: 0 : elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
2027 : 23 : relform = (Form_pg_class) GETSTRUCT(reltup);
2028 : :
2029 : 23 : relform->relfrozenxid = frozenXid;
2030 : 23 : relform->relminmxid = cutoffMulti;
2031 : :
2032 : 23 : CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
2033 : :
2034 : 23 : table_close(relRelation, RowExclusiveLock);
2035 : : }
2036 : :
2037 : : /* Destroy new heap with old filenumber */
2038 : 1423 : object.classId = RelationRelationId;
2039 : 1423 : object.objectId = OIDNewHeap;
2040 : 1423 : object.objectSubId = 0;
2041 : :
2042 [ + + ]: 1423 : if (!reindex)
2043 : : {
2044 : : /*
2045 : : * Make sure the changes in pg_class are visible. This is especially
2046 : : * important if !swap_toast_by_content, so that the correct TOAST
2047 : : * relation is dropped. (reindex_relation() above did not help in this
2048 : : * case))
2049 : : */
2050 : 7 : CommandCounterIncrement();
2051 : : }
2052 : :
2053 : : /*
2054 : : * The new relation is local to our transaction and we know nothing
2055 : : * depends on it, so DROP_RESTRICT should be OK.
2056 : : */
2057 : 1423 : performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
2058 : :
2059 : : /* performDeletion does CommandCounterIncrement at end */
2060 : :
2061 : : /*
2062 : : * Now we must remove any relation mapping entries that we set up for the
2063 : : * transient table, as well as its toast table and toast index if any. If
2064 : : * we fail to do this before commit, the relmapper will complain about new
2065 : : * permanent map entries being added post-bootstrap.
2066 : : */
2067 [ + + ]: 1512 : for (i = 0; OidIsValid(mapped_tables[i]); i++)
2068 : 89 : RelationMapRemoveMapping(mapped_tables[i]);
2069 : :
2070 : : /*
2071 : : * At this point, everything is kosher except that, if we did toast swap
2072 : : * by links, the toast table's name corresponds to the transient table.
2073 : : * The name is irrelevant to the backend because it's referenced by OID,
2074 : : * but users looking at the catalogs could be confused. Rename it to
2075 : : * prevent this problem.
2076 : : *
2077 : : * Note no lock required on the relation, because we already hold an
2078 : : * exclusive lock on it.
2079 : : */
2080 [ + + ]: 1423 : if (!swap_toast_by_content)
2081 : : {
2082 : : Relation newrel;
2083 : :
2084 : 1291 : newrel = table_open(OIDOldHeap, NoLock);
2085 [ + + ]: 1291 : if (OidIsValid(newrel->rd_rel->reltoastrelid))
2086 : : {
2087 : : Oid toastidx;
2088 : : char NewToastName[NAMEDATALEN];
2089 : :
2090 : : /* Get the associated valid index to be renamed */
2091 : 389 : toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2092 : : AccessExclusiveLock);
2093 : :
2094 : : /* rename the toast table ... */
2095 : 389 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2096 : : OIDOldHeap);
2097 : 389 : RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2098 : : NewToastName, true, false);
2099 : :
2100 : : /* ... and its valid index too. */
2101 : 389 : snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2102 : : OIDOldHeap);
2103 : :
2104 : 389 : RenameRelationInternal(toastidx,
2105 : : NewToastName, true, true);
2106 : :
2107 : : /*
2108 : : * Reset the relrewrite for the toast. The command-counter
2109 : : * increment is required here as we are about to update the tuple
2110 : : * that is updated as part of RenameRelationInternal.
2111 : : */
2112 : 389 : CommandCounterIncrement();
2113 : 389 : ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2114 : : }
2115 : 1291 : relation_close(newrel, NoLock);
2116 : : }
2117 : :
2118 : : /* if it's not a catalog table, clear any missing attribute settings */
2119 [ + + ]: 1423 : if (!is_system_catalog)
2120 : : {
2121 : : Relation newrel;
2122 : :
2123 : 1302 : newrel = table_open(OIDOldHeap, NoLock);
2124 : 1302 : RelationClearMissing(newrel);
2125 : 1302 : relation_close(newrel, NoLock);
2126 : : }
2127 : 1423 : }
2128 : :
2129 : : /*
2130 : : * Determine which relations to process, when REPACK/CLUSTER is called
2131 : : * without specifying a table name. The exact process depends on whether
2132 : : * USING INDEX was given or not, and in any case we only return tables and
2133 : : * materialized views that the current user has privileges to repack/cluster.
2134 : : *
2135 : : * If USING INDEX was given, we scan pg_index to find those that have
2136 : : * indisclustered set; if it was not given, scan pg_class and return all
2137 : : * tables.
2138 : : *
2139 : : * Return it as a list of RelToCluster in the given memory context.
2140 : : */
2141 : : static List *
2142 : 16 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
2143 : : {
2144 : : Relation catalog;
2145 : : TableScanDesc scan;
2146 : : HeapTuple tuple;
2147 : 16 : List *rtcs = NIL;
2148 : :
2149 [ + + ]: 16 : if (usingindex)
2150 : : {
2151 : : ScanKeyData entry;
2152 : :
2153 : : /*
2154 : : * For USING INDEX, scan pg_index to find those with indisclustered.
2155 : : */
2156 : 12 : catalog = table_open(IndexRelationId, AccessShareLock);
2157 : 12 : ScanKeyInit(&entry,
2158 : : Anum_pg_index_indisclustered,
2159 : : BTEqualStrategyNumber, F_BOOLEQ,
2160 : : BoolGetDatum(true));
2161 : 12 : scan = table_beginscan_catalog(catalog, 1, &entry);
2162 [ + + ]: 24 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2163 : : {
2164 : : RelToCluster *rtc;
2165 : : Form_pg_index index;
2166 : : HeapTuple classtup;
2167 : : Form_pg_class classForm;
2168 : : MemoryContext oldcxt;
2169 : :
2170 : 12 : index = (Form_pg_index) GETSTRUCT(tuple);
2171 : :
2172 : : /*
2173 : : * Try to obtain a light lock on the index's table, to ensure it
2174 : : * doesn't go away while we collect the list. If we cannot, just
2175 : : * disregard it. Be sure to release this if we ultimately decide
2176 : : * not to process the table!
2177 : : */
2178 [ - + ]: 12 : if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
2179 : 0 : continue;
2180 : :
2181 : : /* Verify that the table still exists; skip if not */
2182 : 12 : classtup = SearchSysCache1(RELOID, ObjectIdGetDatum(index->indrelid));
2183 [ - + ]: 12 : if (!HeapTupleIsValid(classtup))
2184 : : {
2185 : 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2186 : 0 : continue;
2187 : : }
2188 : 12 : classForm = (Form_pg_class) GETSTRUCT(classtup);
2189 : :
2190 : : /* Skip temp relations belonging to other sessions */
2191 [ - + ]: 12 : if (classForm->relpersistence == RELPERSISTENCE_TEMP &&
2192 [ # # ]: 0 : !isTempOrTempToastNamespace(classForm->relnamespace))
2193 : : {
2194 : 0 : ReleaseSysCache(classtup);
2195 : 0 : UnlockRelationOid(index->indrelid, AccessShareLock);
2196 : 0 : continue;
2197 : : }
2198 : :
2199 : 12 : ReleaseSysCache(classtup);
2200 : :
2201 : : /* noisily skip rels which the user can't process */
2202 [ + + ]: 12 : if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2203 : : GetUserId()))
2204 : : {
2205 : 8 : UnlockRelationOid(index->indrelid, AccessShareLock);
2206 : 8 : continue;
2207 : : }
2208 : :
2209 : : /* Use a permanent memory context for the result list */
2210 : 4 : oldcxt = MemoryContextSwitchTo(permcxt);
2211 : 4 : rtc = palloc_object(RelToCluster);
2212 : 4 : rtc->tableOid = index->indrelid;
2213 : 4 : rtc->indexOid = index->indexrelid;
2214 : 4 : rtcs = lappend(rtcs, rtc);
2215 : 4 : MemoryContextSwitchTo(oldcxt);
2216 : : }
2217 : : }
2218 : : else
2219 : : {
2220 : 4 : catalog = table_open(RelationRelationId, AccessShareLock);
2221 : 4 : scan = table_beginscan_catalog(catalog, 0, NULL);
2222 : :
2223 [ + + ]: 9168 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2224 : : {
2225 : : RelToCluster *rtc;
2226 : : Form_pg_class class;
2227 : : MemoryContext oldcxt;
2228 : :
2229 : 9164 : class = (Form_pg_class) GETSTRUCT(tuple);
2230 : :
2231 : : /*
2232 : : * Try to obtain a light lock on the table, to ensure it doesn't
2233 : : * go away while we collect the list. If we cannot, just
2234 : : * disregard the table. Be sure to release this if we ultimately
2235 : : * decide not to process the table!
2236 : : */
2237 [ - + ]: 9164 : if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
2238 : 0 : continue;
2239 : :
2240 : : /* Verify that the table still exists */
2241 [ - + ]: 9164 : if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
2242 : : {
2243 : 0 : UnlockRelationOid(class->oid, AccessShareLock);
2244 : 0 : continue;
2245 : : }
2246 : :
2247 : : /* Can only process plain tables and matviews */
2248 [ + + ]: 9164 : if (class->relkind != RELKIND_RELATION &&
2249 [ + + ]: 6072 : class->relkind != RELKIND_MATVIEW)
2250 : : {
2251 : 6040 : UnlockRelationOid(class->oid, AccessShareLock);
2252 : 6040 : continue;
2253 : : }
2254 : :
2255 : : /* Skip temp relations belonging to other sessions */
2256 [ + + ]: 3124 : if (class->relpersistence == RELPERSISTENCE_TEMP &&
2257 [ - + ]: 16 : !isTempOrTempToastNamespace(class->relnamespace))
2258 : : {
2259 : 0 : UnlockRelationOid(class->oid, AccessShareLock);
2260 : 0 : continue;
2261 : : }
2262 : :
2263 : : /* noisily skip rels which the user can't process */
2264 [ + + ]: 3124 : if (!repack_is_permitted_for_relation(cmd, class->oid,
2265 : : GetUserId()))
2266 : : {
2267 : 3116 : UnlockRelationOid(class->oid, AccessShareLock);
2268 : 3116 : continue;
2269 : : }
2270 : :
2271 : : /* Use a permanent memory context for the result list */
2272 : 8 : oldcxt = MemoryContextSwitchTo(permcxt);
2273 : 8 : rtc = palloc_object(RelToCluster);
2274 : 8 : rtc->tableOid = class->oid;
2275 : 8 : rtc->indexOid = InvalidOid;
2276 : 8 : rtcs = lappend(rtcs, rtc);
2277 : 8 : MemoryContextSwitchTo(oldcxt);
2278 : : }
2279 : : }
2280 : :
2281 : 16 : table_endscan(scan);
2282 : 16 : relation_close(catalog, AccessShareLock);
2283 : :
2284 : 16 : return rtcs;
2285 : : }
2286 : :
2287 : : /*
2288 : : * Given a partitioned table or its index, return a list of RelToCluster for
2289 : : * all the leaf child tables/indexes.
2290 : : *
2291 : : * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2292 : : * owning relation.
2293 : : */
2294 : : static List *
2295 : 20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
2296 : : bool rel_is_index, MemoryContext permcxt)
2297 : : {
2298 : : List *inhoids;
2299 : 20 : List *rtcs = NIL;
2300 : :
2301 : : /*
2302 : : * Do not lock the children until they're processed. Note that we do hold
2303 : : * a lock on the parent partitioned table.
2304 : : */
2305 : 20 : inhoids = find_all_inheritors(relid, NoLock, NULL);
2306 [ + - + + : 148 : foreach_oid(child_oid, inhoids)
+ + ]
2307 : : {
2308 : : Oid table_oid,
2309 : : index_oid;
2310 : : RelToCluster *rtc;
2311 : : MemoryContext oldcxt;
2312 : :
2313 [ + + ]: 108 : if (rel_is_index)
2314 : : {
2315 : : /* consider only leaf indexes */
2316 [ + + ]: 80 : if (get_rel_relkind(child_oid) != RELKIND_INDEX)
2317 : 40 : continue;
2318 : :
2319 : 40 : table_oid = IndexGetRelation(child_oid, false);
2320 : 40 : index_oid = child_oid;
2321 : : }
2322 : : else
2323 : : {
2324 : : /* consider only leaf relations */
2325 [ + + ]: 28 : if (get_rel_relkind(child_oid) != RELKIND_RELATION)
2326 : 16 : continue;
2327 : :
2328 : 12 : table_oid = child_oid;
2329 : 12 : index_oid = InvalidOid;
2330 : : }
2331 : :
2332 : : /*
2333 : : * It's possible that the user does not have privileges to CLUSTER the
2334 : : * leaf partition despite having them on the partitioned table. Skip
2335 : : * if so.
2336 : : */
2337 [ + + ]: 52 : if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
2338 : 12 : continue;
2339 : :
2340 : : /* Use a permanent memory context for the result list */
2341 : 40 : oldcxt = MemoryContextSwitchTo(permcxt);
2342 : 40 : rtc = palloc_object(RelToCluster);
2343 : 40 : rtc->tableOid = table_oid;
2344 : 40 : rtc->indexOid = index_oid;
2345 : 40 : rtcs = lappend(rtcs, rtc);
2346 : 40 : MemoryContextSwitchTo(oldcxt);
2347 : : }
2348 : :
2349 : 20 : return rtcs;
2350 : : }
2351 : :
2352 : :
2353 : : /*
2354 : : * Return whether userid has privileges to REPACK relid. If not, this
2355 : : * function emits a WARNING.
2356 : : */
2357 : : static bool
2358 : 3240 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
2359 : : {
2360 : : Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
2361 : :
2362 [ + + ]: 3240 : if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2363 : 104 : return true;
2364 : :
2365 [ + - ]: 3136 : ereport(WARNING,
2366 : : errmsg("permission denied to execute %s on \"%s\", skipping it",
2367 : : RepackCommandAsString(cmd),
2368 : : get_rel_name(relid)));
2369 : :
2370 : 3136 : return false;
2371 : : }
2372 : :
2373 : :
2374 : : /*
2375 : : * Given a RepackStmt with an indicated relation name, resolve the relation
2376 : : * name, obtain lock on it, then determine what to do based on the relation
2377 : : * type: if it's table and not partitioned, repack it as indicated (using an
2378 : : * existing clustered index, or following the given one), and return NULL.
2379 : : *
2380 : : * On the other hand, if the table is partitioned, do nothing further and
2381 : : * instead return the opened and locked relcache entry, so that caller can
2382 : : * process the partitions using the multiple-table handling code. In this
2383 : : * case, if an index name is given, it's up to the caller to resolve it.
2384 : : */
2385 : : static Relation
2386 : 214 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
2387 : : ClusterParams *params)
2388 : : {
2389 : : Relation rel;
2390 : : Oid tableOid;
2391 : :
2392 : : Assert(stmt->relation != NULL);
2393 : : Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2394 : : stmt->command == REPACK_COMMAND_REPACK);
2395 : :
2396 : : /*
2397 : : * Make sure ANALYZE is specified if a column list is present.
2398 : : */
2399 [ + + + + ]: 214 : if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2400 [ + - ]: 4 : ereport(ERROR,
2401 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2402 : : errmsg("ANALYZE option must be specified when a column list is provided"));
2403 : :
2404 : : /* Find, lock, and check permissions on the table. */
2405 : 210 : tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2406 : : lockmode,
2407 : : 0,
2408 : : RangeVarCallbackMaintainsTable,
2409 : : NULL);
2410 : 202 : rel = table_open(tableOid, NoLock);
2411 : :
2412 : : /*
2413 : : * Reject clustering a remote temp table ... their local buffer manager is
2414 : : * not going to cope.
2415 : : */
2416 [ + + + + ]: 202 : if (RELATION_IS_OTHER_TEMP(rel))
2417 [ + - ]: 1 : ereport(ERROR,
2418 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2419 : : /*- translator: first %s is name of a SQL command, eg. REPACK */
2420 : : errmsg("cannot execute %s on temporary tables of other sessions",
2421 : : RepackCommandAsString(stmt->command)));
2422 : :
2423 : : /*
2424 : : * For partitioned tables, let caller handle this. Otherwise, process it
2425 : : * here and we're done.
2426 : : */
2427 [ + + ]: 201 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2428 : 33 : return rel;
2429 : : else
2430 : : {
2431 : 168 : Oid indexOid = InvalidOid;
2432 : :
2433 : 168 : indexOid = determine_clustered_index(rel, stmt->usingindex,
2434 : 168 : stmt->indexname);
2435 [ + + ]: 160 : if (OidIsValid(indexOid))
2436 : 137 : check_index_is_clusterable(rel, indexOid, lockmode);
2437 : :
2438 : 148 : cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2439 : :
2440 : : /*
2441 : : * Do an analyze, if requested. We close the transaction and start a
2442 : : * new one, so that we don't hold the stronger lock for longer than
2443 : : * needed.
2444 : : */
2445 [ + + ]: 137 : if (params->options & CLUOPT_ANALYZE)
2446 : : {
2447 : 8 : VacuumParams vac_params = {0};
2448 : :
2449 : 8 : PopActiveSnapshot();
2450 : 8 : CommitTransactionCommand();
2451 : :
2452 : 8 : StartTransactionCommand();
2453 : 8 : PushActiveSnapshot(GetTransactionSnapshot());
2454 : :
2455 : 8 : vac_params.options |= VACOPT_ANALYZE;
2456 [ - + ]: 8 : if (params->options & CLUOPT_VERBOSE)
2457 : 0 : vac_params.options |= VACOPT_VERBOSE;
2458 : 8 : analyze_rel(tableOid, NULL, &vac_params,
2459 : 8 : stmt->relation->va_cols, true, NULL);
2460 : 8 : PopActiveSnapshot();
2461 : 8 : CommandCounterIncrement();
2462 : : }
2463 : :
2464 : 137 : return NULL;
2465 : : }
2466 : : }
2467 : :
2468 : : /*
2469 : : * Given a relation and the usingindex/indexname options in a
2470 : : * REPACK USING INDEX or CLUSTER command, return the OID of the
2471 : : * index to use for clustering the table.
2472 : : *
2473 : : * Caller must hold lock on the relation so that the set of indexes
2474 : : * doesn't change, and must call check_index_is_clusterable.
2475 : : */
2476 : : static Oid
2477 : 188 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2478 : : {
2479 : : Oid indexOid;
2480 : :
2481 [ + + + + ]: 188 : if (indexname == NULL && usingindex)
2482 : : {
2483 : : /*
2484 : : * If USING INDEX with no name is given, find a clustered index, or
2485 : : * error out if none.
2486 : : */
2487 : 18 : indexOid = InvalidOid;
2488 [ + - + + : 40 : foreach_oid(idxoid, RelationGetIndexList(rel))
+ + ]
2489 : : {
2490 [ + + ]: 18 : if (get_index_isclustered(idxoid))
2491 : : {
2492 : 14 : indexOid = idxoid;
2493 : 14 : break;
2494 : : }
2495 : : }
2496 : :
2497 [ + + ]: 18 : if (!OidIsValid(indexOid))
2498 [ + - ]: 4 : ereport(ERROR,
2499 : : errcode(ERRCODE_UNDEFINED_OBJECT),
2500 : : errmsg("there is no previously clustered index for table \"%s\"",
2501 : : RelationGetRelationName(rel)));
2502 : : }
2503 [ + + ]: 170 : else if (indexname != NULL)
2504 : : {
2505 : : /* An index was specified; obtain its OID. */
2506 : 147 : indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2507 [ + + ]: 147 : if (!OidIsValid(indexOid))
2508 [ + - ]: 4 : ereport(ERROR,
2509 : : errcode(ERRCODE_UNDEFINED_OBJECT),
2510 : : errmsg("index \"%s\" for table \"%s\" does not exist",
2511 : : indexname, RelationGetRelationName(rel)));
2512 : : }
2513 : : else
2514 : 23 : indexOid = InvalidOid;
2515 : :
2516 : 180 : return indexOid;
2517 : : }
2518 : :
2519 : : static const char *
2520 : 3608 : RepackCommandAsString(RepackCommand cmd)
2521 : : {
2522 [ + + + - ]: 3608 : switch (cmd)
2523 : : {
2524 : 3199 : case REPACK_COMMAND_REPACK:
2525 : 3199 : return "REPACK";
2526 : 226 : case REPACK_COMMAND_VACUUMFULL:
2527 : 226 : return "VACUUM";
2528 : 183 : case REPACK_COMMAND_CLUSTER:
2529 : 183 : return "CLUSTER";
2530 : : }
2531 : 0 : return "???"; /* keep compiler quiet */
2532 : : }
2533 : :
2534 : : /*
2535 : : * Apply all the changes stored in 'file'.
2536 : : */
2537 : : static void
2538 : 14 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
2539 : : {
2540 : 14 : ConcurrentChangeKind kind = '\0';
2541 : 14 : Relation rel = chgcxt->cc_rel;
2542 : : TupleTableSlot *spilled_tuple;
2543 : : TupleTableSlot *old_update_tuple;
2544 : : TupleTableSlot *ondisk_tuple;
2545 : 14 : bool have_old_tuple = false;
2546 : : MemoryContext oldcxt;
2547 : :
2548 : 14 : spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2549 : : &TTSOpsVirtual);
2550 : 14 : ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2551 : : table_slot_callbacks(rel));
2552 : 14 : old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
2553 : : &TTSOpsVirtual);
2554 : :
2555 [ + + ]: 14 : oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
2556 : :
2557 : : while (true)
2558 : 40 : {
2559 : : size_t nread;
2560 : 54 : ConcurrentChangeKind prevkind = kind;
2561 : :
2562 [ - + ]: 54 : CHECK_FOR_INTERRUPTS();
2563 : :
2564 : 54 : nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2565 [ + + ]: 54 : if (nread == 0) /* done with the file? */
2566 : 14 : break;
2567 : :
2568 : : /*
2569 : : * If this is the old tuple for an update, read it into the tuple slot
2570 : : * and go to the next one. The update itself will be executed on the
2571 : : * next iteration, when we receive the NEW tuple.
2572 : : */
2573 [ + + ]: 40 : if (kind == CHANGE_UPDATE_OLD)
2574 : : {
2575 : 8 : restore_tuple(file, rel, old_update_tuple);
2576 : 8 : have_old_tuple = true;
2577 : 8 : continue;
2578 : : }
2579 : :
2580 : : /*
2581 : : * Just before an UPDATE or DELETE, we must update the command
2582 : : * counter, because the change could refer to a tuple that we have
2583 : : * just inserted; and before an INSERT, we have to do this also if the
2584 : : * previous command was either update or delete.
2585 : : *
2586 : : * With this approach we don't spend so many CCIs for long strings of
2587 : : * only INSERTs, which can't affect one another.
2588 : : */
2589 [ + + + + ]: 32 : if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2590 [ + - + + : 7 : (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
+ + ]
2591 : : prevkind == CHANGE_DELETE)))
2592 : : {
2593 : 29 : CommandCounterIncrement();
2594 : 29 : UpdateActiveSnapshotCommandId();
2595 : : }
2596 : :
2597 : : /*
2598 : : * Now restore the tuple into the slot and execute the change.
2599 : : */
2600 : 32 : restore_tuple(file, rel, spilled_tuple);
2601 : :
2602 [ + + ]: 32 : if (kind == CHANGE_INSERT)
2603 : : {
2604 : 7 : apply_concurrent_insert(rel, spilled_tuple, chgcxt);
2605 : : }
2606 [ + + ]: 25 : else if (kind == CHANGE_DELETE)
2607 : : {
2608 : : bool found;
2609 : :
2610 : : /* Find the tuple to be deleted */
2611 : 3 : found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
2612 [ - + ]: 3 : if (!found)
2613 [ # # ]: 0 : elog(ERROR, "could not find target tuple");
2614 : 3 : apply_concurrent_delete(rel, ondisk_tuple);
2615 : : }
2616 [ + - ]: 22 : else if (kind == CHANGE_UPDATE_NEW)
2617 : : {
2618 : : TupleTableSlot *key;
2619 : : bool found;
2620 : :
2621 [ + + ]: 22 : if (have_old_tuple)
2622 : 8 : key = old_update_tuple;
2623 : : else
2624 : 14 : key = spilled_tuple;
2625 : :
2626 : : /* Find the tuple to be updated or deleted. */
2627 : 22 : found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2628 [ - + ]: 22 : if (!found)
2629 [ # # ]: 0 : elog(ERROR, "could not find target tuple");
2630 : :
2631 : : /*
2632 : : * If 'tup' contains TOAST pointers, they point to the old
2633 : : * relation's toast. Copy the corresponding TOAST pointers for the
2634 : : * new relation from the existing tuple. (The fact that we
2635 : : * received a TOAST pointer here implies that the attribute hasn't
2636 : : * changed.)
2637 : : */
2638 : 22 : adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
2639 : :
2640 : 22 : apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
2641 : :
2642 : 22 : ExecClearTuple(old_update_tuple);
2643 : 22 : have_old_tuple = false;
2644 : : }
2645 : : else
2646 [ # # ]: 0 : elog(ERROR, "unrecognized kind of change: %d", kind);
2647 : :
2648 [ + - ]: 32 : ResetPerTupleExprContext(chgcxt->cc_estate);
2649 : : }
2650 : :
2651 : : /* Cleanup. */
2652 : 14 : ExecDropSingleTupleTableSlot(spilled_tuple);
2653 : 14 : ExecDropSingleTupleTableSlot(ondisk_tuple);
2654 : 14 : ExecDropSingleTupleTableSlot(old_update_tuple);
2655 : :
2656 : 14 : MemoryContextSwitchTo(oldcxt);
2657 : 14 : }
2658 : :
2659 : : /*
2660 : : * Apply an insert from the spill of concurrent changes to the new copy of the
2661 : : * table.
2662 : : */
2663 : : static void
2664 : 7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
2665 : : ChangeContext *chgcxt)
2666 : : {
2667 : : /* Put the tuple in the table, but make sure it won't be decoded */
2668 : 7 : table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2669 : : TABLE_INSERT_NO_LOGICAL, NULL);
2670 : :
2671 : : /* Update indexes with this new tuple. */
2672 : 7 : ExecInsertIndexTuples(chgcxt->cc_rri,
2673 : : chgcxt->cc_estate,
2674 : : 0,
2675 : : slot,
2676 : : NIL, NULL);
2677 : 7 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
2678 : 7 : }
2679 : :
2680 : : /*
2681 : : * Apply an update from the spill of concurrent changes to the new copy of the
2682 : : * table.
2683 : : */
2684 : : static void
2685 : 22 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
2686 : : TupleTableSlot *ondisk_tuple,
2687 : : ChangeContext *chgcxt)
2688 : : {
2689 : : LockTupleMode lockmode;
2690 : : TM_FailureData tmfd;
2691 : : TU_UpdateIndexes update_indexes;
2692 : : TM_Result res;
2693 : :
2694 : : /*
2695 : : * Carry out the update, skipping logical decoding for it.
2696 : : */
2697 : 22 : res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2698 : : GetCurrentCommandId(true),
2699 : : TABLE_UPDATE_NO_LOGICAL,
2700 : : InvalidSnapshot,
2701 : : InvalidSnapshot,
2702 : : false,
2703 : : &tmfd, &lockmode, &update_indexes);
2704 [ - + ]: 22 : if (res != TM_Ok)
2705 [ # # ]: 0 : ereport(ERROR,
2706 : : errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
2707 : : errmsg("could not apply concurrent %s on relation \"%s\"",
2708 : : "UPDATE", RelationGetRelationName(rel)));
2709 : :
2710 [ + + ]: 22 : if (update_indexes != TU_None)
2711 : : {
2712 : 8 : uint32 flags = EIIT_IS_UPDATE;
2713 : :
2714 [ - + ]: 8 : if (update_indexes == TU_Summarizing)
2715 : 0 : flags |= EIIT_ONLY_SUMMARIZING;
2716 : 8 : ExecInsertIndexTuples(chgcxt->cc_rri,
2717 : : chgcxt->cc_estate,
2718 : : flags,
2719 : : spilled_tuple,
2720 : : NIL, NULL);
2721 : : }
2722 : :
2723 : 22 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
2724 : 22 : }
2725 : :
2726 : : static void
2727 : 3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
2728 : : {
2729 : : TM_Result res;
2730 : : TM_FailureData tmfd;
2731 : :
2732 : : /*
2733 : : * Delete tuple from the new heap, skipping logical decoding for it.
2734 : : */
2735 : 3 : res = table_tuple_delete(rel, &(slot->tts_tid),
2736 : : GetCurrentCommandId(true),
2737 : : TABLE_DELETE_NO_LOGICAL,
2738 : : InvalidSnapshot, InvalidSnapshot,
2739 : : false,
2740 : : &tmfd);
2741 : :
2742 [ - + ]: 3 : if (res != TM_Ok)
2743 [ # # ]: 0 : ereport(ERROR,
2744 : : errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
2745 : : errmsg("could not apply concurrent %s on relation \"%s\"",
2746 : : "DELETE", RelationGetRelationName(rel)));
2747 : :
2748 : 3 : pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
2749 : 3 : }
2750 : :
2751 : : /*
2752 : : * Read tuple from file and put it in the input slot. All memory is allocated
2753 : : * in the current memory context; caller is responsible for freeing it as
2754 : : * appropriate.
2755 : : *
2756 : : * External attributes are stored in separate memory chunks, in order to avoid
2757 : : * exceeding MaxAllocSize - that could happen if the individual attributes are
2758 : : * smaller than MaxAllocSize but the whole tuple is bigger.
2759 : : */
2760 : : static void
2761 : 40 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
2762 : : {
2763 : : uint32 t_len;
2764 : : HeapTuple tup;
2765 : : int natt_ext;
2766 : :
2767 : : /* Read the tuple. */
2768 : 40 : BufFileReadExact(file, &t_len, sizeof(t_len));
2769 : 40 : tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2770 : 40 : tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2771 : 40 : BufFileReadExact(file, tup->t_data, t_len);
2772 : 40 : tup->t_len = t_len;
2773 : 40 : ItemPointerSetInvalid(&tup->t_self);
2774 : 40 : tup->t_tableOid = RelationGetRelid(relation);
2775 : :
2776 : : /*
2777 : : * Put the tuple we read in a slot. This deforms it, so that we can hack
2778 : : * the external attributes in place.
2779 : : */
2780 : 40 : ExecForceStoreHeapTuple(tup, slot, false);
2781 : :
2782 : : /*
2783 : : * Next, read any attributes we stored separately into the tts_values
2784 : : * array elements expecting them, if any. This matches
2785 : : * repack_store_change.
2786 : : */
2787 : 40 : BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2788 [ + + ]: 40 : if (natt_ext > 0)
2789 : : {
2790 : 11 : TupleDesc desc = slot->tts_tupleDescriptor;
2791 : :
2792 [ + + ]: 66 : for (int i = 0; i < desc->natts; i++)
2793 : : {
2794 : 55 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2795 : : varlena *varlen;
2796 : : uint64 chunk_header;
2797 : : void *value;
2798 : : Size varlensz;
2799 : :
2800 [ + + + + ]: 55 : if (attr->attisdropped || attr->attlen != -1)
2801 : 40 : continue;
2802 [ - + ]: 22 : if (slot_attisnull(slot, i + 1))
2803 : 0 : continue;
2804 : 22 : varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
2805 [ + + ]: 22 : if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
2806 : 7 : continue;
2807 : 15 : slot_getsomeattrs(slot, i + 1);
2808 : :
2809 : 15 : BufFileReadExact(file, &chunk_header, VARHDRSZ);
2810 : 15 : varlensz = VARSIZE_ANY(&chunk_header);
2811 : :
2812 : 15 : value = palloc(varlensz);
2813 : 15 : memcpy(value, &chunk_header, VARHDRSZ);
2814 : 15 : BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2815 : :
2816 : 15 : slot->tts_values[i] = PointerGetDatum(value);
2817 : 15 : natt_ext--;
2818 [ - + ]: 15 : if (natt_ext < 0)
2819 [ # # ]: 0 : elog(ERROR, "insufficient number of attributes stored separately");
2820 : : }
2821 : :
2822 [ - + ]: 11 : if (natt_ext != 0)
2823 [ # # ]: 0 : elog(ERROR,
2824 : : "unexpected number of attributes stored separately (%d remaining)",
2825 : : natt_ext);
2826 : : }
2827 : 40 : }
2828 : :
2829 : : /*
2830 : : * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2831 : : * corresponding ones from 'src'.
2832 : : */
2833 : : static void
2834 : 22 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
2835 : : {
2836 : 22 : TupleDesc desc = dest->tts_tupleDescriptor;
2837 : :
2838 [ + + ]: 112 : for (int i = 0; i < desc->natts; i++)
2839 : : {
2840 : 90 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
2841 : : varlena *varlena_dst;
2842 : :
2843 [ + + ]: 90 : if (attr->attisdropped)
2844 : 24 : continue;
2845 [ + + ]: 66 : if (attr->attlen != -1)
2846 : 36 : continue;
2847 [ - + ]: 30 : if (slot_attisnull(dest, i + 1))
2848 : 0 : continue;
2849 : :
2850 : 30 : slot_getsomeattrs(dest, i + 1);
2851 : :
2852 : 30 : varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2853 [ + + ]: 30 : if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
2854 : 28 : continue;
2855 : 2 : slot_getsomeattrs(src, i + 1);
2856 : :
2857 : 2 : dest->tts_values[i] = src->tts_values[i];
2858 : : }
2859 : 22 : }
2860 : :
2861 : : /*
2862 : : * Find the tuple to be updated or deleted by the given data change, whose
2863 : : * tuple has already been loaded into locator.
2864 : : *
2865 : : * If the tuple is found, put it in retrieved and return true. If the tuple is
2866 : : * not found, return false.
2867 : : */
2868 : : static bool
2869 : 25 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
2870 : : TupleTableSlot *retrieved)
2871 : : {
2872 : 25 : Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2873 : : IndexScanDesc scan;
2874 : 25 : bool retval = false;
2875 : :
2876 : : /*
2877 : : * Scan key is passed by caller, so it does not have to be constructed
2878 : : * multiple times. Key entries have all fields initialized, except for
2879 : : * sk_argument.
2880 : : *
2881 : : * Use the incoming tuple to finalize the scan key.
2882 : : */
2883 [ + + ]: 52 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2884 : : {
2885 : 27 : ScanKey entry = &chgcxt->cc_ident_key[i];
2886 : 27 : AttrNumber attno = idx->indkey.values[i];
2887 : :
2888 : 27 : entry->sk_argument = locator->tts_values[attno - 1];
2889 : : Assert(!locator->tts_isnull[attno - 1]);
2890 : : }
2891 : :
2892 : : /* XXX no instrumentation for now */
2893 : 25 : scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2894 : : NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2895 : 25 : index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2896 [ + - ]: 26 : while (index_getnext_slot(scan, ForwardScanDirection, retrieved))
2897 : : {
2898 : : /* Be wary of temporal constraints */
2899 [ + + + + ]: 26 : if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
2900 : : {
2901 [ - + ]: 1 : CHECK_FOR_INTERRUPTS();
2902 : 1 : continue;
2903 : : }
2904 : :
2905 : 25 : retval = true;
2906 : 25 : break;
2907 : : }
2908 : 25 : index_endscan(scan);
2909 : :
2910 : 25 : return retval;
2911 : : }
2912 : :
2913 : : /*
2914 : : * Check whether the candidate tuple matches the locator tuple on all replica
2915 : : * identity key columns, using the same equality operators as the identity
2916 : : * index scan. The locator tuple has already been loaded into cc_ident_key.
2917 : : *
2918 : : * This is needed to filter lossy index matches, such as GiST multirange scans
2919 : : * used for temporal constraints.
2920 : : */
2921 : : static bool
2922 : 2 : identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator,
2923 : : TupleTableSlot *candidate)
2924 : : {
2925 : 2 : slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
2926 : 2 : slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
2927 : :
2928 [ + + ]: 4 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2929 : : {
2930 : 3 : ScanKey entry = &chgcxt->cc_ident_key[i];
2931 : 3 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
2932 : :
2933 : : Assert(attno > 0);
2934 : :
2935 [ - + ]: 3 : if (locator->tts_isnull[attno - 1] != candidate->tts_isnull[attno - 1])
2936 : 0 : return false;
2937 : :
2938 [ - + ]: 3 : if (locator->tts_isnull[attno - 1])
2939 : 0 : continue;
2940 : :
2941 [ + + ]: 3 : if (!DatumGetBool(FunctionCall2Coll(&entry->sk_func,
2942 : : entry->sk_collation,
2943 : 3 : candidate->tts_values[attno - 1],
2944 : : entry->sk_argument)))
2945 : 1 : return false;
2946 : : }
2947 : :
2948 : 1 : return true;
2949 : : }
2950 : :
2951 : : /*
2952 : : * Decode and apply concurrent changes, up to (and including) the record whose
2953 : : * LSN is 'end_of_wal'.
2954 : : *
2955 : : * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2956 : : * are far too similar to each other.
2957 : : */
2958 : : static void
2959 : 14 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
2960 : : {
2961 : : DecodingWorkerShared *shared;
2962 : : char fname[MAXPGPATH];
2963 : : BufFile *file;
2964 : :
2965 : 14 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
2966 : : PROGRESS_REPACK_PHASE_CATCH_UP);
2967 : :
2968 : : /* Ask the worker for the file. */
2969 : 14 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
2970 : 14 : SpinLockAcquire(&shared->mutex);
2971 : 14 : shared->lsn_upto = end_of_wal;
2972 : 14 : shared->done = done;
2973 : 14 : SpinLockRelease(&shared->mutex);
2974 : :
2975 : : /*
2976 : : * The worker needs to finish processing of the current WAL record. Even
2977 : : * if it's idle, it'll need to close the output file. Thus we're likely to
2978 : : * wait, so prepare for sleep.
2979 : : */
2980 : 14 : ConditionVariablePrepareToSleep(&shared->cv);
2981 : : for (;;)
2982 : 14 : {
2983 : : int last_exported;
2984 : :
2985 : 28 : SpinLockAcquire(&shared->mutex);
2986 : 28 : last_exported = shared->last_exported;
2987 : 28 : SpinLockRelease(&shared->mutex);
2988 : :
2989 : : /*
2990 : : * Has the worker exported the file we are waiting for?
2991 : : */
2992 [ + + ]: 28 : if (last_exported == chgcxt->cc_file_seq)
2993 : 14 : break;
2994 : :
2995 : 14 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
2996 : : }
2997 : 14 : ConditionVariableCancelSleep();
2998 : :
2999 : : /* Open the file. */
3000 : 14 : DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
3001 : 14 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3002 : 14 : apply_concurrent_changes(file, chgcxt);
3003 : :
3004 : 14 : BufFileClose(file);
3005 : :
3006 : : /* Get ready for the next file. */
3007 : 14 : chgcxt->cc_file_seq++;
3008 : 14 : }
3009 : :
3010 : : /*
3011 : : * Initialize the ChangeContext struct for the given relation, with
3012 : : * the given index as identity index.
3013 : : */
3014 : : static void
3015 : 7 : initialize_change_context(ChangeContext *chgcxt,
3016 : : Relation relation, Oid ident_index_id)
3017 : : {
3018 : 7 : chgcxt->cc_rel = relation;
3019 : :
3020 : : /* Only initialize fields needed by ExecInsertIndexTuples(). */
3021 : 7 : chgcxt->cc_estate = CreateExecutorState();
3022 : :
3023 : : /*
3024 : : * Set up a range table for the executor, containing our repacked table as
3025 : : * its only member.
3026 : : */
3027 : : {
3028 : : RangeTblEntry *rte;
3029 : 7 : TupleDesc desc = RelationGetDescr(relation);
3030 : 7 : List *perminfos = NIL;
3031 : 7 : Bitmapset *updatedCols = NULL;
3032 : : RTEPermissionInfo *perminfo;
3033 : :
3034 : : /*
3035 : : * For our use, the RTE only needs to have perminfoindex initialized,
3036 : : * but there's no reason to not set the fields whose values we have at
3037 : : * hand.
3038 : : */
3039 : 7 : rte = makeNode(RangeTblEntry);
3040 : 7 : rte->rtekind = RTE_RELATION;
3041 : 7 : rte->relid = RelationGetRelid(relation);
3042 : 7 : rte->relkind = RelationGetForm(relation)->relkind;
3043 : : /* Create the RTEPermissionInfo instance (and set ->perminfoindex). */
3044 : 7 : addRTEPermissionInfo(&perminfos, rte);
3045 : :
3046 : : /*
3047 : : * Initialize updatedCols to show that all columns are updated. This
3048 : : * is of course not necessarily true, and we cannot know this early;
3049 : : * but this is only used by ExecInsertIndexTuples to flag index
3050 : : * updates with no logical value changes, so if it's wrong, nothing
3051 : : * terribly bad happens. We may want to improve this someday though.
3052 : : *
3053 : : * Don't claim that dropped columns are changed though.
3054 : : */
3055 [ + + ]: 25 : for (int i = 0; i < desc->natts; i++)
3056 : : {
3057 : 18 : CompactAttribute *attr = TupleDescCompactAttr(desc, i);
3058 : :
3059 [ + + ]: 18 : if (attr->attisdropped)
3060 : 2 : continue;
3061 : 16 : updatedCols = bms_add_member(updatedCols,
3062 : : i + 1 - FirstLowInvalidHeapAttributeNumber);
3063 : : }
3064 : :
3065 : : /* install updatedCols in the right place */
3066 : 7 : perminfo = getRTEPermissionInfo(perminfos, rte);
3067 : 7 : perminfo->updatedCols = updatedCols;
3068 : :
3069 : : /* finally we can initialize the range table proper */
3070 : 7 : ExecInitRangeTable(chgcxt->cc_estate, list_make1(rte), perminfos,
3071 : : bms_make_singleton(1));
3072 : : }
3073 : :
3074 : : /* Set up our ResultRelInfo to use for index updates */
3075 : 7 : chgcxt->cc_rri = makeNode(ResultRelInfo);
3076 : 7 : InitResultRelInfo(chgcxt->cc_rri, relation, 1, NULL, 0);
3077 : 7 : ExecOpenIndices(chgcxt->cc_rri, false);
3078 : :
3079 : : /*
3080 : : * The table's relcache entry already has the relcache entry for the
3081 : : * identity index; find that.
3082 : : */
3083 : 7 : chgcxt->cc_ident_index = NULL;
3084 [ + - ]: 7 : for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
3085 : : {
3086 : : Relation ind_rel;
3087 : :
3088 : 7 : ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
3089 [ + - ]: 7 : if (ind_rel->rd_id == ident_index_id)
3090 : : {
3091 : 7 : chgcxt->cc_ident_index = ind_rel;
3092 : 7 : break;
3093 : : }
3094 : : }
3095 [ - + ]: 7 : if (chgcxt->cc_ident_index == NULL)
3096 [ # # ]: 0 : elog(ERROR, "could not find identity index");
3097 : :
3098 : : /* Set up for scanning said identity index */
3099 : : {
3100 : : Form_pg_index indexForm;
3101 : :
3102 : 7 : indexForm = chgcxt->cc_ident_index->rd_index;
3103 : 7 : chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
3104 : 7 : chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
3105 [ + + ]: 16 : for (int i = 0; i < indexForm->indnkeyatts; i++)
3106 : : {
3107 : : ScanKey entry;
3108 : : Oid opfamily,
3109 : : opcintype,
3110 : : opno,
3111 : : opcode;
3112 : : StrategyNumber eq_strategy;
3113 : :
3114 : 9 : entry = &chgcxt->cc_ident_key[i];
3115 : :
3116 : 9 : opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
3117 : 9 : opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
3118 : 9 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ,
3119 : 9 : chgcxt->cc_ident_index->rd_rel->relam,
3120 : : opfamily, false);
3121 [ - + ]: 9 : if (eq_strategy == InvalidStrategy)
3122 [ # # ]: 0 : elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
3123 : : opfamily, opcintype);
3124 : 9 : opno = get_opfamily_member(opfamily, opcintype, opcintype,
3125 : : eq_strategy);
3126 [ - + ]: 9 : if (!OidIsValid(opno))
3127 [ # # ]: 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
3128 : : eq_strategy, opcintype, opcintype, opfamily);
3129 : 9 : opcode = get_opcode(opno);
3130 [ - + ]: 9 : if (!OidIsValid(opcode))
3131 [ # # ]: 0 : elog(ERROR, "missing oprcode for operator %u", opno);
3132 : :
3133 : : /* Initialize everything but argument. */
3134 : 9 : ScanKeyInit(entry,
3135 : 9 : i + 1,
3136 : : eq_strategy, opcode,
3137 : : (Datum) 0);
3138 : 9 : entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
3139 : : }
3140 : : }
3141 : :
3142 : : /* Determine the last column we must deform to read the identity */
3143 : 7 : chgcxt->cc_last_key_attno = InvalidAttrNumber;
3144 [ + + ]: 16 : for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
3145 : : {
3146 : 9 : AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
3147 : :
3148 : : Assert(attno > 0);
3149 : 9 : chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
3150 : : }
3151 : :
3152 : 7 : chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
3153 : 7 : }
3154 : :
3155 : : /*
3156 : : * Free up resources taken by a ChangeContext.
3157 : : */
3158 : : static void
3159 : 7 : release_change_context(ChangeContext *chgcxt)
3160 : : {
3161 : 7 : ExecCloseIndices(chgcxt->cc_rri);
3162 : 7 : FreeExecutorState(chgcxt->cc_estate);
3163 : : /* XXX are these pfrees necessary? */
3164 : 7 : pfree(chgcxt->cc_rri);
3165 : 7 : pfree(chgcxt->cc_ident_key);
3166 : 7 : }
3167 : :
3168 : : /*
3169 : : * The final steps of rebuild_relation() for concurrent processing.
3170 : : *
3171 : : * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
3172 : : * clustering index (if one is passed) are still locked in a mode that allows
3173 : : * concurrent data changes. On exit, both tables and their indexes are closed,
3174 : : * but locked in AccessExclusiveLock mode.
3175 : : */
3176 : : static void
3177 : 7 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
3178 : : Oid identIdx, TransactionId frozenXid,
3179 : : MultiXactId cutoffMulti)
3180 : : {
3181 : : List *ind_oids_new;
3182 : 7 : Oid old_table_oid = RelationGetRelid(OldHeap);
3183 : 7 : Oid new_table_oid = RelationGetRelid(NewHeap);
3184 : 7 : List *ind_oids_old = RelationGetIndexList(OldHeap);
3185 : : ListCell *lc,
3186 : : *lc2;
3187 : : char relpersistence;
3188 : : bool is_system_catalog;
3189 : : Oid ident_idx_new;
3190 : : XLogRecPtr end_of_wal;
3191 : : List *indexrels;
3192 : : ChangeContext chgcxt;
3193 : :
3194 : : Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
3195 : : Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
3196 : :
3197 : : /*
3198 : : * Unlike the exclusive case, we build new indexes for the new relation
3199 : : * rather than swapping the storage and reindexing the old relation. The
3200 : : * point is that the index build can take some time, so we do it before we
3201 : : * get AccessExclusiveLock on the old heap and therefore we cannot swap
3202 : : * the heap storage yet.
3203 : : *
3204 : : * index_create() will lock the new indexes using AccessExclusiveLock - no
3205 : : * need to change that. At the same time, we use ShareUpdateExclusiveLock
3206 : : * to lock the existing indexes - that should be enough to prevent others
3207 : : * from changing them while we're repacking the relation. The lock on
3208 : : * table should prevent others from changing the index column list, but
3209 : : * might not be enough for commands like ALTER INDEX ... SET ... (Those
3210 : : * are not necessarily dangerous, but can make user confused if the
3211 : : * changes they do get lost due to REPACK.)
3212 : : */
3213 : 7 : ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
3214 : :
3215 : : /*
3216 : : * The identity index in the new relation appears in the same relative
3217 : : * position as the corresponding index in the old relation. Find it.
3218 : : */
3219 : 7 : ident_idx_new = InvalidOid;
3220 [ + - + - : 14 : foreach_oid(ind_old, ind_oids_old)
+ + ]
3221 : : {
3222 [ + - ]: 7 : if (identIdx == ind_old)
3223 : : {
3224 : 7 : int pos = foreach_current_index(ind_old);
3225 : :
3226 [ - + ]: 7 : if (list_length(ind_oids_new) <= pos)
3227 [ # # ]: 0 : elog(ERROR, "list of new indexes too short");
3228 : 7 : ident_idx_new = list_nth_oid(ind_oids_new, pos);
3229 : 7 : break;
3230 : : }
3231 : : }
3232 [ - + ]: 7 : if (!OidIsValid(ident_idx_new))
3233 [ # # ]: 0 : elog(ERROR, "could not find index matching \"%s\" at the new relation",
3234 : : get_rel_name(identIdx));
3235 : :
3236 : : /* Gather information to apply concurrent changes. */
3237 : 7 : initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
3238 : :
3239 : : /*
3240 : : * During testing, wait for another backend to perform concurrent data
3241 : : * changes which we will process below.
3242 : : */
3243 : 7 : INJECTION_POINT("repack-concurrently-before-lock", NULL);
3244 : :
3245 : : /*
3246 : : * Flush all WAL records inserted so far (possibly except for the last
3247 : : * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3248 : : * we need to flush while holding exclusive lock on the source table.
3249 : : */
3250 : 7 : XLogFlush(GetXLogInsertEndRecPtr());
3251 : 7 : end_of_wal = GetFlushRecPtr(NULL);
3252 : :
3253 : : /*
3254 : : * Apply concurrent changes first time, to minimize the time we need to
3255 : : * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3256 : : * written during the data copying and index creation.)
3257 : : */
3258 : 7 : process_concurrent_changes(end_of_wal, &chgcxt, false);
3259 : :
3260 : : /*
3261 : : * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3262 : : * is one), all its indexes, so that we can swap the files.
3263 : : */
3264 : 7 : LockRelationOid(old_table_oid, AccessExclusiveLock);
3265 : :
3266 : : /*
3267 : : * Lock all indexes now, not only the clustering one: all indexes need to
3268 : : * have their files swapped. While doing that, store their relation
3269 : : * references in a zero-terminated array, to handle predicate locks below.
3270 : : */
3271 : 7 : indexrels = NIL;
3272 [ + - + + : 22 : foreach_oid(ind_oid, ind_oids_old)
+ + ]
3273 : : {
3274 : : Relation index;
3275 : :
3276 : 8 : index = index_open(ind_oid, AccessExclusiveLock);
3277 : :
3278 : : /*
3279 : : * Some things about the index may have changed before we locked the
3280 : : * index, such as ALTER INDEX RENAME. We don't need to do anything
3281 : : * here to absorb those changes in the new index.
3282 : : */
3283 : 8 : indexrels = lappend(indexrels, index);
3284 : : }
3285 : :
3286 : : /*
3287 : : * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3288 : : * needed to swap the files.
3289 : : */
3290 [ + + ]: 7 : if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3291 : 3 : LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3292 : :
3293 : : /*
3294 : : * Tuples and pages of the old heap will be gone, but the heap will stay.
3295 : : */
3296 : 7 : TransferPredicateLocksToHeapRelation(OldHeap);
3297 [ + - + + : 22 : foreach_ptr(RelationData, index, indexrels)
+ + ]
3298 : : {
3299 : 8 : TransferPredicateLocksToHeapRelation(index);
3300 : 8 : index_close(index, NoLock);
3301 : : }
3302 : 7 : list_free(indexrels);
3303 : :
3304 : : /*
3305 : : * Flush WAL again, to make sure that all changes committed while we were
3306 : : * waiting for the exclusive lock are available for decoding.
3307 : : */
3308 : 7 : XLogFlush(GetXLogInsertEndRecPtr());
3309 : 7 : end_of_wal = GetFlushRecPtr(NULL);
3310 : :
3311 : : /*
3312 : : * Apply the concurrent changes again. Indicate that the decoding worker
3313 : : * won't be needed anymore.
3314 : : */
3315 : 7 : process_concurrent_changes(end_of_wal, &chgcxt, true);
3316 : :
3317 : : /* Remember info about rel before closing OldHeap */
3318 : 7 : relpersistence = OldHeap->rd_rel->relpersistence;
3319 : 7 : is_system_catalog = IsSystemRelation(OldHeap);
3320 : :
3321 : 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3322 : : PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
3323 : :
3324 : : /*
3325 : : * Even ShareUpdateExclusiveLock should have prevented others from
3326 : : * creating / dropping indexes (even using the CONCURRENTLY option), so we
3327 : : * do not need to check whether the lists match.
3328 : : */
3329 [ + - + + : 15 : forboth(lc, ind_oids_old, lc2, ind_oids_new)
+ - + + +
+ + - +
+ ]
3330 : : {
3331 : 8 : Oid ind_old = lfirst_oid(lc);
3332 : 8 : Oid ind_new = lfirst_oid(lc2);
3333 : 8 : Oid mapped_tables[4] = {0};
3334 : :
3335 : 8 : swap_relation_files(ind_old, ind_new,
3336 : : (old_table_oid == RelationRelationId),
3337 : : false, /* swap_toast_by_content */
3338 : : true,
3339 : : InvalidTransactionId,
3340 : : InvalidMultiXactId,
3341 : : mapped_tables);
3342 : :
3343 : : #ifdef USE_ASSERT_CHECKING
3344 : :
3345 : : /*
3346 : : * Concurrent processing is not supported for system relations, so
3347 : : * there should be no mapped tables.
3348 : : */
3349 : : for (int i = 0; i < 4; i++)
3350 : : Assert(!OidIsValid(mapped_tables[i]));
3351 : : #endif
3352 : : }
3353 : :
3354 : : /* The new indexes must be visible for deletion. */
3355 : 7 : CommandCounterIncrement();
3356 : :
3357 : : /* Close the old heap but keep lock until transaction commit. */
3358 : 7 : table_close(OldHeap, NoLock);
3359 : : /* Close the new heap. (We didn't have to open its indexes). */
3360 : 7 : table_close(NewHeap, NoLock);
3361 : :
3362 : : /* Cleanup what we don't need anymore. (And close the identity index.) */
3363 : 7 : release_change_context(&chgcxt);
3364 : :
3365 : : /*
3366 : : * Swap the relations and their TOAST relations and TOAST indexes. This
3367 : : * also drops the new relation and its indexes.
3368 : : *
3369 : : * (System catalogs are currently not supported.)
3370 : : */
3371 : : Assert(!is_system_catalog);
3372 : 7 : finish_heap_swap(old_table_oid, new_table_oid,
3373 : : is_system_catalog,
3374 : : false, /* swap_toast_by_content */
3375 : : false,
3376 : : true,
3377 : : false, /* reindex */
3378 : : frozenXid, cutoffMulti,
3379 : : relpersistence);
3380 : 7 : }
3381 : :
3382 : : /*
3383 : : * Build indexes on NewHeap according to those on OldHeap.
3384 : : *
3385 : : * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3386 : : * up locked using ShareUpdateExclusiveLock.
3387 : : *
3388 : : * A list of OIDs of the corresponding indexes created on NewHeap is
3389 : : * returned. The order of items does match, so we can use these arrays to swap
3390 : : * index storage.
3391 : : */
3392 : : static List *
3393 : 7 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
3394 : : {
3395 : 7 : List *result = NIL;
3396 : :
3397 : 7 : pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
3398 : : PROGRESS_REPACK_PHASE_REBUILD_INDEX);
3399 : :
3400 [ + - + + : 22 : foreach_oid(oldindex, OldIndexes)
+ + ]
3401 : : {
3402 : : Oid newindex;
3403 : : char *newName;
3404 : : Relation ind;
3405 : :
3406 : 8 : ind = index_open(oldindex, ShareUpdateExclusiveLock);
3407 : :
3408 : 8 : newName = ChooseRelationName(get_rel_name(oldindex),
3409 : : NULL,
3410 : : "repacknew",
3411 : 8 : get_rel_namespace(ind->rd_index->indrelid),
3412 : : false);
3413 : 8 : newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
3414 : 8 : oldindex, ind->rd_rel->reltablespace,
3415 : : newName);
3416 : 8 : copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
3417 : 8 : result = lappend_oid(result, newindex);
3418 : :
3419 : 8 : index_close(ind, NoLock);
3420 : : }
3421 : :
3422 : 7 : return result;
3423 : : }
3424 : :
3425 : : /*
3426 : : * Create a transient copy of a constraint -- supported by a transient
3427 : : * copy of the index that supports the original constraint.
3428 : : *
3429 : : * When repacking a table that contains exclusion constraints, the executor
3430 : : * relies on these constraints being properly catalogued. These copies are
3431 : : * to support that.
3432 : : *
3433 : : * We don't need the constraints for anything else (the original constraints
3434 : : * will be there once repack completes), so we add pg_depend entries so that
3435 : : * they are dropped when the transient table is dropped.
3436 : : */
3437 : : static void
3438 : 8 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
3439 : : {
3440 : : ScanKeyData skey;
3441 : : Relation rel;
3442 : : TupleDesc desc;
3443 : : SysScanDesc scan;
3444 : : HeapTuple tup;
3445 : : ObjectAddress objrel;
3446 : :
3447 : 8 : rel = table_open(ConstraintRelationId, RowExclusiveLock);
3448 : 8 : ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
3449 : :
3450 : : /*
3451 : : * Retrieve the constraints supported by the old index and create an
3452 : : * identical one that points to the new index.
3453 : : */
3454 : 8 : ScanKeyInit(&skey,
3455 : : Anum_pg_constraint_conrelid,
3456 : : BTEqualStrategyNumber, F_OIDEQ,
3457 : 8 : ObjectIdGetDatum(old_index->rd_index->indrelid));
3458 : 8 : scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
3459 : : NULL, 1, &skey);
3460 : 8 : desc = RelationGetDescr(rel);
3461 [ + + ]: 26 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
3462 : : {
3463 : 18 : Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
3464 : : Oid oid;
3465 : 18 : Datum values[Natts_pg_constraint] = {0};
3466 : 18 : bool nulls[Natts_pg_constraint] = {0};
3467 : 18 : bool replaces[Natts_pg_constraint] = {0};
3468 : : HeapTuple new_tup;
3469 : : ObjectAddress objcon;
3470 : :
3471 [ + + ]: 18 : if (conform->conindid != RelationGetRelid(old_index))
3472 : 11 : continue;
3473 : :
3474 : 7 : oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
3475 : : Anum_pg_constraint_oid);
3476 : 7 : values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
3477 : 7 : replaces[Anum_pg_constraint_oid - 1] = true;
3478 : 7 : values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
3479 : 7 : replaces[Anum_pg_constraint_conrelid - 1] = true;
3480 : 7 : values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
3481 : 7 : replaces[Anum_pg_constraint_conindid - 1] = true;
3482 : :
3483 : 7 : new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3484 : :
3485 : : /* Insert it into the catalog. */
3486 : 7 : CatalogTupleInsert(rel, new_tup);
3487 : :
3488 : : /* Create a dependency so it's removed when we drop the new heap. */
3489 : 7 : ObjectAddressSet(objcon, ConstraintRelationId, oid);
3490 : 7 : recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
3491 : : }
3492 : 8 : systable_endscan(scan);
3493 : :
3494 : 8 : table_close(rel, RowExclusiveLock);
3495 : :
3496 : 8 : CommandCounterIncrement();
3497 : 8 : }
3498 : :
3499 : : /*
3500 : : * Create a transient copy of attribute defaults.
3501 : : *
3502 : : * When repacking a table that has stored generated columns, the executor
3503 : : * relies on these entries to generate the values for them during apply of
3504 : : * concurrent operations. These copies are there to support that.
3505 : : *
3506 : : * We don't need the defaults for anything else, so we add pg_depend entries
3507 : : * so that they are dropped when the transient table is dropped.
3508 : : */
3509 : : static void
3510 : 7 : copy_attribute_defaults(Oid old_heap_oid, Oid new_heap_oid)
3511 : : {
3512 : : ScanKeyData skey;
3513 : : Relation rel;
3514 : : Relation att_rel;
3515 : : SysScanDesc scan;
3516 : : HeapTuple def_tup;
3517 : : ObjectAddress objrel;
3518 : :
3519 : 7 : rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
3520 : 7 : att_rel = table_open(AttributeRelationId, RowExclusiveLock);
3521 : :
3522 : 7 : ObjectAddressSet(objrel, RelationRelationId, new_heap_oid);
3523 : :
3524 : 7 : ScanKeyInit(&skey,
3525 : : Anum_pg_attrdef_adrelid,
3526 : : BTEqualStrategyNumber, F_OIDEQ,
3527 : : ObjectIdGetDatum(old_heap_oid));
3528 : 7 : scan = systable_beginscan(rel, AttrDefaultIndexId, true,
3529 : : NULL, 1, &skey);
3530 [ + + ]: 9 : while (HeapTupleIsValid(def_tup = systable_getnext(scan)))
3531 : : {
3532 : : Form_pg_attrdef adform;
3533 : : Oid oid;
3534 : : Datum def_values[Natts_pg_attrdef];
3535 : : bool def_nulls[Natts_pg_attrdef];
3536 : 2 : bool def_replaces[Natts_pg_attrdef] = {0};
3537 : : Datum att_values[Natts_pg_attribute];
3538 : : bool att_nulls[Natts_pg_attribute];
3539 : 2 : bool att_replaces[Natts_pg_attribute] = {0};
3540 : : HeapTuple new_def_tup,
3541 : : att_tup,
3542 : : new_att_tup;
3543 : : ObjectAddress objad;
3544 : :
3545 : 2 : adform = (Form_pg_attrdef) GETSTRUCT(def_tup);
3546 : : Assert(adform->adrelid == old_heap_oid);
3547 : :
3548 : : /*
3549 : : * Insert a new tuple that's identical to the existing one, other than
3550 : : * its OID and the relation it refers to.
3551 : : */
3552 : 2 : oid = GetNewOidWithIndex(rel, AttrDefaultOidIndexId,
3553 : : Anum_pg_attrdef_oid);
3554 : 2 : def_values[Anum_pg_attrdef_oid - 1] = ObjectIdGetDatum(oid);
3555 : 2 : def_nulls[Anum_pg_attrdef_oid - 1] = false;
3556 : 2 : def_replaces[Anum_pg_attrdef_oid - 1] = true;
3557 : 2 : def_values[Anum_pg_attrdef_adrelid - 1] = ObjectIdGetDatum(new_heap_oid);
3558 : 2 : def_nulls[Anum_pg_attrdef_adrelid - 1] = false;
3559 : 2 : def_replaces[Anum_pg_attrdef_adrelid - 1] = true;
3560 : 2 : new_def_tup = heap_modify_tuple(def_tup, RelationGetDescr(rel),
3561 : : def_values, def_nulls, def_replaces);
3562 : 2 : CatalogTupleInsert(rel, new_def_tup);
3563 : :
3564 : : /* Set atthasdef for this attribute in the transient table */
3565 : 2 : att_tup = SearchSysCache2(ATTNUM,
3566 : : ObjectIdGetDatum(new_heap_oid),
3567 : 2 : ObjectIdGetDatum(adform->adnum));
3568 [ - + ]: 2 : if (!HeapTupleIsValid(att_tup))
3569 [ # # ]: 0 : elog(ERROR, "cache lookup failed for attribute %d of relation %u",
3570 : : adform->adnum, new_heap_oid);
3571 : 2 : att_values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(true);
3572 : 2 : att_nulls[Anum_pg_attribute_atthasdef - 1] = false;
3573 : 2 : att_replaces[Anum_pg_attribute_atthasdef - 1] = true;
3574 : 2 : new_att_tup = heap_modify_tuple(att_tup, RelationGetDescr(att_rel),
3575 : : att_values, att_nulls, att_replaces);
3576 : 2 : CatalogTupleUpdate(att_rel, &new_att_tup->t_self, new_att_tup);
3577 : 2 : ReleaseSysCache(att_tup);
3578 : :
3579 : : /* Add a pg_depend record so it's removed with the transient table */
3580 : 2 : ObjectAddressSet(objad, AttrDefaultRelationId, oid);
3581 : 2 : recordDependencyOn(&objad, &objrel, DEPENDENCY_AUTO);
3582 : : }
3583 : 7 : systable_endscan(scan);
3584 : :
3585 : 7 : table_close(rel, RowExclusiveLock);
3586 : 7 : table_close(att_rel, RowExclusiveLock);
3587 : :
3588 : 7 : CommandCounterIncrement();
3589 : 7 : }
3590 : :
3591 : : /*
3592 : : * Try to start a background worker to perform logical decoding of data
3593 : : * changes applied to relation while REPACK CONCURRENTLY is copying its
3594 : : * contents to a new table.
3595 : : */
3596 : : static void
3597 : 7 : start_repack_decoding_worker(Oid relid)
3598 : : {
3599 : : Size size;
3600 : : DecodingWorkerShared *shared;
3601 : : shm_mq *mq;
3602 : : BackgroundWorker bgw;
3603 : :
3604 : 7 : decoding_worker = palloc0_object(DecodingWorker);
3605 : :
3606 : : /* Setup shared memory. */
3607 : 7 : size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3608 : : BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
3609 : 7 : decoding_worker->seg = dsm_create(size, 0);
3610 : :
3611 : 7 : shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
3612 : 7 : shared->initialized = false;
3613 : 7 : shared->lsn_upto = InvalidXLogRecPtr;
3614 : 7 : shared->done = false;
3615 : 7 : SharedFileSetInit(&shared->sfs, decoding_worker->seg);
3616 : 7 : shared->last_exported = -1;
3617 : 7 : SpinLockInit(&shared->mutex);
3618 : 7 : shared->dbid = MyDatabaseId;
3619 : :
3620 : : /*
3621 : : * This is the UserId set in cluster_rel(). Security context shouldn't be
3622 : : * needed for decoding worker.
3623 : : */
3624 : 7 : shared->roleid = GetUserId();
3625 : 7 : shared->relid = relid;
3626 : 7 : ConditionVariableInit(&shared->cv);
3627 : 7 : shared->backend_proc = MyProc;
3628 : 7 : shared->backend_pid = MyProcPid;
3629 : 7 : shared->backend_proc_number = MyProcNumber;
3630 : :
3631 : 7 : mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3632 : : REPACK_ERROR_QUEUE_SIZE);
3633 : 7 : shm_mq_set_receiver(mq, MyProc);
3634 : :
3635 : 7 : decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL);
3636 : :
3637 : 7 : memset(&bgw, 0, sizeof(bgw));
3638 : 7 : snprintf(bgw.bgw_name, BGW_MAXLEN,
3639 : : "REPACK decoding worker for relation \"%s\"",
3640 : : get_rel_name(relid));
3641 : 7 : snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3642 : 7 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3643 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3644 : 7 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3645 : 7 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
3646 : 7 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3647 : 7 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3648 : 7 : bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg));
3649 : 7 : bgw.bgw_notify_pid = MyProcPid;
3650 : :
3651 [ - + ]: 7 : if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
3652 [ # # ]: 0 : ereport(ERROR,
3653 : : errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
3654 : : errmsg("out of background worker slots"),
3655 : : errhint("You might need to increase \"%s\".", "max_worker_processes"));
3656 : :
3657 : : /*
3658 : : * The decoding setup must be done before the caller can have XID assigned
3659 : : * for any reason, otherwise the worker might end up in a deadlock,
3660 : : * waiting for the caller's transaction to end. Therefore wait here until
3661 : : * the worker indicates that it has the logical decoding initialized.
3662 : : */
3663 : 7 : ConditionVariablePrepareToSleep(&shared->cv);
3664 : : for (;;)
3665 : 17 : {
3666 : : bool initialized;
3667 : :
3668 : 24 : SpinLockAcquire(&shared->mutex);
3669 : 24 : initialized = shared->initialized;
3670 : 24 : SpinLockRelease(&shared->mutex);
3671 : :
3672 [ + + ]: 24 : if (initialized)
3673 : 7 : break;
3674 : :
3675 : 17 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3676 : : }
3677 : 7 : ConditionVariableCancelSleep();
3678 : 7 : }
3679 : :
3680 : : /*
3681 : : * Stop the decoding worker and cleanup the related resources.
3682 : : *
3683 : : * The worker stops on its own when it knows there is no more work to do, but
3684 : : * we need to stop it explicitly at least on ERROR in the launching backend.
3685 : : */
3686 : : static void
3687 : 7 : stop_repack_decoding_worker(void)
3688 : : {
3689 : : /* Nothing to do if no worker was set up. */
3690 [ - + ]: 7 : if (decoding_worker == NULL)
3691 : 0 : return;
3692 : :
3693 : : /* Terminate the worker process, if one is running. */
3694 [ + - ]: 7 : if (decoding_worker->handle != NULL)
3695 : : {
3696 : : BgwHandleStatus status;
3697 : :
3698 : 7 : TerminateBackgroundWorker(decoding_worker->handle);
3699 : : /* The worker should really exit before the REPACK command does. */
3700 : 7 : HOLD_INTERRUPTS();
3701 : 7 : status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
3702 : 7 : RESUME_INTERRUPTS();
3703 : :
3704 [ - + ]: 7 : if (status == BGWH_POSTMASTER_DIED)
3705 [ # # ]: 0 : ereport(FATAL,
3706 : : errcode(ERRCODE_ADMIN_SHUTDOWN),
3707 : : errmsg("postmaster exited during REPACK command"));
3708 : : }
3709 : :
3710 : : /*
3711 : : * Now detach from our shared memory segment. In error cases there might
3712 : : * still be messages from the worker in the queue, which ProcessInterrupts
3713 : : * would try to read; this is pointless (and causes an assertion failure),
3714 : : * so set the global pointer to NULL to have ProcessRepackMessages ignore
3715 : : * them.
3716 : : *
3717 : : * We must also cancel the current sleep, if one is still set up. This is
3718 : : * critical because the CV lives in the DSM that we're about to detach, so
3719 : : * if we omit it, later automatic cleanup tries to clear freed memory.
3720 : : */
3721 [ + - ]: 7 : if (decoding_worker->error_mqh != NULL)
3722 : 7 : shm_mq_detach(decoding_worker->error_mqh);
3723 : 7 : ConditionVariableCancelSleep();
3724 [ + - ]: 7 : if (decoding_worker->seg != NULL)
3725 : 7 : dsm_detach(decoding_worker->seg);
3726 : 7 : pfree(decoding_worker);
3727 : 7 : decoding_worker = NULL;
3728 : : }
3729 : :
3730 : : /* stop_repack_decoding_worker, wrapped as a before_shmem_exit callback */
3731 : : static void
3732 : 0 : stop_repack_decoding_worker_cb(int code, Datum arg)
3733 : : {
3734 : 0 : stop_repack_decoding_worker();
3735 : 0 : }
3736 : :
3737 : : /*
3738 : : * Get the initial snapshot from the decoding worker.
3739 : : */
3740 : : static Snapshot
3741 : 7 : get_initial_snapshot(DecodingWorker *worker)
3742 : : {
3743 : : DecodingWorkerShared *shared;
3744 : : char fname[MAXPGPATH];
3745 : : BufFile *file;
3746 : : Size snap_size;
3747 : : char *snap_space;
3748 : : Snapshot snapshot;
3749 : :
3750 : 7 : shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3751 : :
3752 : : /*
3753 : : * The worker needs to initialize the logical decoding, which usually
3754 : : * takes some time. Therefore it makes sense to prepare for the sleep
3755 : : * first.
3756 : : */
3757 : 7 : ConditionVariablePrepareToSleep(&shared->cv);
3758 : : for (;;)
3759 : 6 : {
3760 : : int last_exported;
3761 : :
3762 : 13 : SpinLockAcquire(&shared->mutex);
3763 : 13 : last_exported = shared->last_exported;
3764 : 13 : SpinLockRelease(&shared->mutex);
3765 : :
3766 : : /*
3767 : : * Has the worker exported the file we are waiting for?
3768 : : */
3769 [ + + ]: 13 : if (last_exported == WORKER_FILE_SNAPSHOT)
3770 : 7 : break;
3771 : :
3772 : 6 : ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
3773 : : }
3774 : 7 : ConditionVariableCancelSleep();
3775 : :
3776 : : /* Read the snapshot from a file. */
3777 : 7 : DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
3778 : 7 : file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3779 : 7 : BufFileReadExact(file, &snap_size, sizeof(snap_size));
3780 : 7 : snap_space = (char *) palloc(snap_size);
3781 : 7 : BufFileReadExact(file, snap_space, snap_size);
3782 : 7 : BufFileClose(file);
3783 : :
3784 : : /* Restore it. */
3785 : 7 : snapshot = RestoreSnapshot(snap_space);
3786 : 7 : pfree(snap_space);
3787 : :
3788 : 7 : return snapshot;
3789 : : }
3790 : :
3791 : : /*
3792 : : * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3793 : : * If relations of the same 'relid' happen to be processed at the same time,
3794 : : * they must be from different databases and therefore different backends must
3795 : : * be involved.
3796 : : */
3797 : : void
3798 : 42 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
3799 : : {
3800 : : /* The PID is already present in the fileset name, so we needn't add it */
3801 : 42 : snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3802 : 42 : }
3803 : :
3804 : : /*
3805 : : * Handle receipt of an interrupt indicating a repack worker message.
3806 : : *
3807 : : * Note: this is called within a signal handler! All we can do is set
3808 : : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3809 : : * ProcessRepackMessages().
3810 : : */
3811 : : void
3812 : 7 : HandleRepackMessageInterrupt(void)
3813 : : {
3814 : 7 : InterruptPending = true;
3815 : 7 : RepackMessagePending = true;
3816 : 7 : SetLatch(MyLatch);
3817 : 7 : }
3818 : :
3819 : : /*
3820 : : * Process any queued protocol messages received from the repack worker.
3821 : : */
3822 : : void
3823 : 7 : ProcessRepackMessages(void)
3824 : : {
3825 : : MemoryContext oldcontext;
3826 : : static MemoryContext hpm_context = NULL;
3827 : :
3828 : : /*
3829 : : * Nothing to do if we haven't launched the worker yet or have already
3830 : : * terminated it.
3831 : : */
3832 [ + + ]: 7 : if (decoding_worker == NULL)
3833 : 1 : return;
3834 : :
3835 : : /*
3836 : : * This is invoked from ProcessInterrupts(), and since some of the
3837 : : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3838 : : * for recursive calls if more signals are received while this runs. It's
3839 : : * unclear that recursive entry would be safe, and it doesn't seem useful
3840 : : * even if it is safe, so let's block interrupts until done.
3841 : : */
3842 : 6 : HOLD_INTERRUPTS();
3843 : :
3844 : : /*
3845 : : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3846 : : * don't want to risk leaking data into long-lived contexts, so let's do
3847 : : * our work here in a private context that we can reset on each use.
3848 : : */
3849 [ + - ]: 6 : if (hpm_context == NULL) /* first time through? */
3850 : 6 : hpm_context = AllocSetContextCreate(TopMemoryContext,
3851 : : "ProcessRepackMessages",
3852 : : ALLOCSET_DEFAULT_SIZES);
3853 : : else
3854 : 0 : MemoryContextReset(hpm_context);
3855 : :
3856 : 6 : oldcontext = MemoryContextSwitchTo(hpm_context);
3857 : :
3858 : : /* OK to process messages. Reset the flag saying there are more to do. */
3859 : 6 : RepackMessagePending = false;
3860 : :
3861 : : /*
3862 : : * Read as many messages as we can from the worker, but stop when no more
3863 : : * messages can be read from the worker without blocking.
3864 : : */
3865 : : while (true)
3866 : 0 : {
3867 : : shm_mq_result res;
3868 : : Size nbytes;
3869 : : void *data;
3870 : :
3871 : 6 : res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
3872 : : &data, true);
3873 [ - + ]: 6 : if (res == SHM_MQ_WOULD_BLOCK)
3874 : 0 : break;
3875 [ - + ]: 6 : else if (res == SHM_MQ_SUCCESS)
3876 : : {
3877 : : StringInfoData msg;
3878 : :
3879 : 0 : initStringInfo(&msg);
3880 : 0 : appendBinaryStringInfo(&msg, data, nbytes);
3881 : 0 : ProcessRepackMessage(&msg);
3882 : 0 : pfree(msg.data);
3883 : : }
3884 : : else
3885 : : {
3886 : : /*
3887 : : * The decoding worker is special in that it exits as soon as it
3888 : : * has its work done. Thus the DETACHED result code is fine.
3889 : : */
3890 : : Assert(res == SHM_MQ_DETACHED);
3891 : :
3892 : 6 : break;
3893 : : }
3894 : : }
3895 : :
3896 : 6 : MemoryContextSwitchTo(oldcontext);
3897 : :
3898 : : /* Might as well clear the context on our way out */
3899 : 6 : MemoryContextReset(hpm_context);
3900 : :
3901 : 6 : RESUME_INTERRUPTS();
3902 : : }
3903 : :
3904 : : /*
3905 : : * Process a single protocol message received from a single parallel worker.
3906 : : */
3907 : : static void
3908 : 0 : ProcessRepackMessage(StringInfo msg)
3909 : : {
3910 : : char msgtype;
3911 : :
3912 : 0 : msgtype = pq_getmsgbyte(msg);
3913 : :
3914 [ # # ]: 0 : switch (msgtype)
3915 : : {
3916 : 0 : case PqMsg_ErrorResponse:
3917 : : case PqMsg_NoticeResponse:
3918 : : {
3919 : : ErrorData edata;
3920 : :
3921 : : /* Parse ErrorResponse or NoticeResponse. */
3922 : 0 : pq_parse_errornotice(msg, &edata);
3923 : :
3924 : : /* Death of a worker isn't enough justification for suicide. */
3925 : 0 : edata.elevel = Min(edata.elevel, ERROR);
3926 : :
3927 : : /*
3928 : : * Add a context line to show that this is a message
3929 : : * propagated from the worker. Otherwise, it can sometimes be
3930 : : * confusing to understand what actually happened.
3931 : : */
3932 [ # # ]: 0 : if (edata.context)
3933 : 0 : edata.context = psprintf("%s\n%s", edata.context,
3934 : : _("REPACK decoding worker"));
3935 : : else
3936 : 0 : edata.context = pstrdup(_("REPACK decoding worker"));
3937 : :
3938 : : /* Rethrow error or print notice. */
3939 : 0 : ThrowErrorData(&edata);
3940 : :
3941 : 0 : break;
3942 : : }
3943 : :
3944 : 0 : default:
3945 : : {
3946 [ # # ]: 0 : elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3947 : : msgtype, msg->len);
3948 : : }
3949 : : }
3950 : 0 : }
|