LCOV - code coverage report
Current view: top level - src/backend/commands - repack.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 90.7 % 1069 970
Test Date: 2026-06-12 20:16:36 Functions: 95.0 % 40 38
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * repack.c
       4              :  *    REPACK a table; formerly known as CLUSTER.  VACUUM FULL also uses
       5              :  *    parts of this code.
       6              :  *
       7              :  * There are two somewhat different ways to rewrite a table.  In non-
       8              :  * concurrent mode, it's easy: take AccessExclusiveLock, create a new
       9              :  * transient relation, copy the tuples over to the relfilenode of the new
      10              :  * relation, swap the relfilenodes, then drop the old relation.
      11              :  *
      12              :  * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
      13              :  * then do an initial copy as above.  However, while the tuples are being
      14              :  * copied, concurrent transactions could modify the table. To cope with those
      15              :  * changes, we rely on logical decoding to obtain them from WAL.  A bgworker
      16              :  * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
      17              :  * from being reserved), and accumulates the changes in a file.  Once the
      18              :  * initial copy is complete, we read the changes from the file and re-apply
      19              :  * them on the new heap.  Then we upgrade our ShareUpdateExclusiveLock to
      20              :  * AccessExclusiveLock and swap the relfilenodes.  This way, the time we hold
      21              :  * a strong lock on the table is much reduced, and the bloat is eliminated.
      22              :  *
      23              :  *
      24              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      25              :  * Portions Copyright (c) 1994-5, Regents of the University of California
      26              :  *
      27              :  *
      28              :  * IDENTIFICATION
      29              :  *    src/backend/commands/repack.c
      30              :  *
      31              :  *-------------------------------------------------------------------------
      32              :  */
      33              : #include "postgres.h"
      34              : 
      35              : #include "access/amapi.h"
      36              : #include "access/heapam.h"
      37              : #include "access/multixact.h"
      38              : #include "access/relscan.h"
      39              : #include "access/tableam.h"
      40              : #include "access/toast_internals.h"
      41              : #include "access/transam.h"
      42              : #include "access/xact.h"
      43              : #include "access/xlog.h"
      44              : #include "catalog/catalog.h"
      45              : #include "catalog/dependency.h"
      46              : #include "catalog/heap.h"
      47              : #include "catalog/index.h"
      48              : #include "catalog/namespace.h"
      49              : #include "catalog/objectaccess.h"
      50              : #include "catalog/pg_am.h"
      51              : #include "catalog/pg_constraint.h"
      52              : #include "catalog/pg_inherits.h"
      53              : #include "catalog/toasting.h"
      54              : #include "commands/defrem.h"
      55              : #include "commands/progress.h"
      56              : #include "commands/repack.h"
      57              : #include "commands/repack_internal.h"
      58              : #include "commands/tablecmds.h"
      59              : #include "commands/vacuum.h"
      60              : #include "executor/executor.h"
      61              : #include "libpq/pqformat.h"
      62              : #include "libpq/pqmq.h"
      63              : #include "miscadmin.h"
      64              : #include "optimizer/optimizer.h"
      65              : #include "pgstat.h"
      66              : #include "replication/logicalrelation.h"
      67              : #include "storage/bufmgr.h"
      68              : #include "storage/ipc.h"
      69              : #include "storage/lmgr.h"
      70              : #include "storage/predicate.h"
      71              : #include "storage/proc.h"
      72              : #include "utils/acl.h"
      73              : #include "utils/fmgroids.h"
      74              : #include "utils/guc.h"
      75              : #include "utils/injection_point.h"
      76              : #include "utils/inval.h"
      77              : #include "utils/lsyscache.h"
      78              : #include "utils/memutils.h"
      79              : #include "utils/pg_rusage.h"
      80              : #include "utils/relmapper.h"
      81              : #include "utils/snapmgr.h"
      82              : #include "utils/syscache.h"
      83              : #include "utils/wait_event_types.h"
      84              : 
      85              : /*
      86              :  * This struct is used to pass around the information on tables to be
      87              :  * clustered. We need this so we can make a list of them when invoked without
      88              :  * a specific table/index pair.
      89              :  */
      90              : typedef struct
      91              : {
      92              :     Oid         tableOid;
      93              :     Oid         indexOid;
      94              : } RelToCluster;
      95              : 
      96              : /*
      97              :  * The first file exported by the decoding worker must contain a snapshot, the
      98              :  * following ones contain the data changes.
      99              :  */
     100              : #define WORKER_FILE_SNAPSHOT    0
     101              : 
     102              : /*
     103              :  * Information needed to apply concurrent data changes.
     104              :  */
     105              : typedef struct ChangeContext
     106              : {
     107              :     /* The relation the changes are applied to. */
     108              :     Relation    cc_rel;
     109              : 
     110              :     /* Needed to update indexes of cc_rel. */
     111              :     ResultRelInfo *cc_rri;
     112              :     EState     *cc_estate;
     113              : 
     114              :     /*
     115              :      * Existing tuples to UPDATE and DELETE are located via this index. We
     116              :      * keep the scankey in partially initialized state to avoid repeated work.
     117              :      * sk_argument is completed on the fly.
     118              :      */
     119              :     Relation    cc_ident_index;
     120              :     ScanKey     cc_ident_key;
     121              :     int         cc_ident_key_nentries;
     122              : 
     123              :     /* The latest column we need to deform to have the tuple identity */
     124              :     AttrNumber  cc_last_key_attno;
     125              : 
     126              :     /* Sequential number of the file containing the changes. */
     127              :     int         cc_file_seq;
     128              : } ChangeContext;
     129              : 
     130              : /*
     131              :  * Backend-local information to control the decoding worker.
     132              :  */
     133              : typedef struct DecodingWorker
     134              : {
     135              :     /* The worker. */
     136              :     BackgroundWorkerHandle *handle;
     137              : 
     138              :     /* DecodingWorkerShared is in this segment. */
     139              :     dsm_segment *seg;
     140              : 
     141              :     /* Handle of the error queue. */
     142              :     shm_mq_handle *error_mqh;
     143              : } DecodingWorker;
     144              : 
     145              : /* Pointer to currently running decoding worker. */
     146              : static DecodingWorker *decoding_worker = NULL;
     147              : 
     148              : /*
     149              :  * Is there a message sent by a repack worker that the backend needs to
     150              :  * receive?
     151              :  */
     152              : volatile sig_atomic_t RepackMessagePending = false;
     153              : 
     154              : static LOCKMODE RepackLockLevel(bool concurrent);
     155              : static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
     156              :                                 Oid indexOid, Oid userid, LOCKMODE lmode,
     157              :                                 int options);
     158              : static void check_concurrent_repack_requirements(Relation rel,
     159              :                                                  Oid *ident_idx_p);
     160              : static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
     161              :                              Oid ident_idx);
     162              : static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
     163              :                             Snapshot snapshot,
     164              :                             bool verbose,
     165              :                             bool *pSwapToastByContent,
     166              :                             TransactionId *pFreezeXid,
     167              :                             MultiXactId *pCutoffMulti);
     168              : static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
     169              :                                   MemoryContext permcxt);
     170              : static List *get_tables_to_repack_partitioned(RepackCommand cmd,
     171              :                                               Oid relid, bool rel_is_index,
     172              :                                               MemoryContext permcxt);
     173              : static bool repack_is_permitted_for_relation(RepackCommand cmd,
     174              :                                              Oid relid, Oid userid);
     175              : 
     176              : static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
     177              : static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
     178              :                                     ChangeContext *chgcxt);
     179              : static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
     180              :                                     TupleTableSlot *ondisk_tuple,
     181              :                                     ChangeContext *chgcxt);
     182              : static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
     183              : static void restore_tuple(BufFile *file, Relation relation,
     184              :                           TupleTableSlot *slot);
     185              : static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
     186              :                                   TupleTableSlot *src);
     187              : static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
     188              :                               TupleTableSlot *locator,
     189              :                               TupleTableSlot *retrieved);
     190              : static bool identity_key_equal(ChangeContext *chgcxt,
     191              :                                TupleTableSlot *locator,
     192              :                                TupleTableSlot *candidate);
     193              : static void process_concurrent_changes(XLogRecPtr end_of_wal,
     194              :                                        ChangeContext *chgcxt,
     195              :                                        bool done);
     196              : static void initialize_change_context(ChangeContext *chgcxt,
     197              :                                       Relation relation,
     198              :                                       Oid ident_index_id);
     199              : static void release_change_context(ChangeContext *chgcxt);
     200              : static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
     201              :                                                Oid identIdx,
     202              :                                                TransactionId frozenXid,
     203              :                                                MultiXactId cutoffMulti);
     204              : static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
     205              : static void copy_index_constraints(Relation old_index, Oid new_index_id,
     206              :                                    Oid new_heap_id);
     207              : static Relation process_single_relation(RepackStmt *stmt,
     208              :                                         LOCKMODE lockmode,
     209              :                                         bool isTopLevel,
     210              :                                         ClusterParams *params);
     211              : static Oid  determine_clustered_index(Relation rel, bool usingindex,
     212              :                                       const char *indexname);
     213              : 
     214              : static void start_repack_decoding_worker(Oid relid);
     215              : static void stop_repack_decoding_worker(void);
     216              : static void stop_repack_decoding_worker_cb(int code, Datum arg);
     217              : static Snapshot get_initial_snapshot(DecodingWorker *worker);
     218              : 
     219              : static void ProcessRepackMessage(StringInfo msg);
     220              : static const char *RepackCommandAsString(RepackCommand cmd);
     221              : 
     222              : 
     223              : /*
     224              :  * The repack code allows for processing multiple tables at once. Because
     225              :  * of this, we cannot just run everything on a single transaction, or we
     226              :  * would be forced to acquire exclusive locks on all the tables being
     227              :  * clustered, simultaneously --- very likely leading to deadlock.
     228              :  *
     229              :  * To solve this we follow a similar strategy to VACUUM code, processing each
     230              :  * relation in a separate transaction. For this to work, we need to:
     231              :  *
     232              :  *  - provide a separate memory context so that we can pass information in
     233              :  *    a way that survives across transactions
     234              :  *  - start a new transaction every time a new relation is clustered
     235              :  *  - check for validity of the information on to-be-clustered relations,
     236              :  *    as someone might have deleted a relation behind our back, or
     237              :  *    clustered one on a different index
     238              :  *  - end the transaction
     239              :  *
     240              :  * The single-relation case does not have any such overhead.
     241              :  *
     242              :  * We also allow a relation to be repacked following an index, but without
     243              :  * naming a specific one.  In that case, the indisclustered bit will be
     244              :  * looked up, and an ERROR will be thrown if no so-marked index is found.
     245              :  */
     246              : void
     247          233 : ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
     248              : {
     249          233 :     ClusterParams params = {0};
     250          233 :     Relation    rel = NULL;
     251              :     MemoryContext repack_context;
     252              :     LOCKMODE    lockmode;
     253              :     List       *rtcs;
     254          233 :     bool        verbose = false;
     255          233 :     bool        analyze = false;
     256          233 :     bool        concurrently = false;
     257              : 
     258              :     /* Parse option list */
     259          496 :     foreach_node(DefElem, opt, stmt->params)
     260              :     {
     261           30 :         if (strcmp(opt->defname, "verbose") == 0)
     262            7 :             verbose = defGetBoolean(opt);
     263           23 :         else if (strcmp(opt->defname, "analyze") == 0 ||
     264           15 :                  strcmp(opt->defname, "analyse") == 0)
     265            8 :             analyze = defGetBoolean(opt);
     266           15 :         else if (strcmp(opt->defname, "concurrently") == 0)
     267              :         {
     268           15 :             if (stmt->command != REPACK_COMMAND_REPACK)
     269            0 :                 ereport(ERROR,
     270              :                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     271              :                         errmsg("CONCURRENTLY option not supported for %s",
     272              :                                RepackCommandAsString(stmt->command)));
     273           15 :             concurrently = defGetBoolean(opt);
     274              :         }
     275              :         else
     276            0 :             ereport(ERROR,
     277              :                     errcode(ERRCODE_SYNTAX_ERROR),
     278              :                     errmsg("unrecognized %s option \"%s\"",
     279              :                            RepackCommandAsString(stmt->command),
     280              :                            opt->defname),
     281              :                     parser_errposition(pstate, opt->location));
     282              :     }
     283              : 
     284          466 :     params.options |=
     285          466 :         (verbose ? CLUOPT_VERBOSE : 0) |
     286          233 :         (analyze ? CLUOPT_ANALYZE : 0) |
     287          233 :         (concurrently ? CLUOPT_CONCURRENT : 0);
     288              : 
     289              :     /* Determine the lock mode to use. */
     290          233 :     lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
     291              : 
     292          233 :     if ((params.options & CLUOPT_CONCURRENT) != 0)
     293              :     {
     294              :         /*
     295              :          * Make sure we're not in a transaction block.
     296              :          *
     297              :          * The reason is that repack_setup_logical_decoding() could wait
     298              :          * indefinitely for our XID to complete. (The deadlock detector would
     299              :          * not recognize it because we'd be waiting for ourselves, i.e. no
     300              :          * real lock conflict.) It would be possible to run in a transaction
     301              :          * block if we had no XID, but this restriction is simpler for users
     302              :          * to understand and we don't lose any functionality.
     303              :          */
     304           15 :         PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
     305              :     }
     306              : 
     307              :     /*
     308              :      * If a single relation is specified, process it and we're done ... unless
     309              :      * the relation is a partitioned table, in which case we fall through.
     310              :      */
     311          233 :     if (stmt->relation != NULL)
     312              :     {
     313          217 :         rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
     314          173 :         if (rel == NULL)
     315          140 :             return;             /* all done */
     316              :     }
     317              : 
     318              :     /*
     319              :      * Don't allow ANALYZE in the multiple-relation case for now.  Maybe we
     320              :      * can add support for this later.
     321              :      */
     322           49 :     if (params.options & CLUOPT_ANALYZE)
     323            0 :         ereport(ERROR,
     324              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     325              :                 errmsg("cannot execute %s on multiple tables",
     326              :                        "REPACK (ANALYZE)"));
     327              : 
     328              :     /*
     329              :      * By here, we know we are in a multi-table situation.
     330              :      *
     331              :      * Concurrent processing is currently considered rather special (e.g. in
     332              :      * terms of resources consumed) so it is not performed in bulk.
     333              :      */
     334           49 :     if (params.options & CLUOPT_CONCURRENT)
     335              :     {
     336            1 :         if (rel != NULL)
     337              :         {
     338              :             Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
     339            1 :             ereport(ERROR,
     340              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     341              :                     errmsg("%s is not supported for partitioned tables",
     342              :                            "REPACK (CONCURRENTLY)"),
     343              :                     errhint("Consider running the command on individual partitions."));
     344              :         }
     345              :         else
     346            0 :             ereport(ERROR,
     347              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     348              :                     errmsg("%s requires an explicit table name",
     349              :                            "REPACK (CONCURRENTLY)"));
     350              :     }
     351              : 
     352              :     /*
     353              :      * In order to avoid holding locks for too long, we want to process each
     354              :      * table in its own transaction.  This forces us to disallow running
     355              :      * inside a user transaction block.
     356              :      */
     357           48 :     PreventInTransactionBlock(isTopLevel, RepackCommandAsString(stmt->command));
     358              : 
     359              :     /* Also, we need a memory context to hold our list of relations */
     360           48 :     repack_context = AllocSetContextCreate(PortalContext,
     361              :                                            "Repack",
     362              :                                            ALLOCSET_DEFAULT_SIZES);
     363              : 
     364              :     /*
     365              :      * Since we open a new transaction for each relation, we have to check
     366              :      * that the relation still is what we think it is.
     367              :      *
     368              :      * In single-transaction CLUSTER, we don't need the overhead.
     369              :      */
     370           48 :     params.options |= CLUOPT_RECHECK;
     371              : 
     372              :     /*
     373              :      * If we don't have a relation yet, determine a relation list.  If we do,
     374              :      * then it must be a partitioned table, and we want to process its
     375              :      * partitions.
     376              :      */
     377           48 :     if (rel == NULL)
     378              :     {
     379              :         Assert(stmt->indexname == NULL);
     380           16 :         rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
     381              :                                     repack_context);
     382           16 :         params.options |= CLUOPT_RECHECK_ISCLUSTERED;
     383              :     }
     384              :     else
     385              :     {
     386              :         Oid         relid;
     387              :         bool        rel_is_index;
     388              : 
     389              :         Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
     390              : 
     391              :         /*
     392              :          * If USING INDEX was specified, resolve the index name now and pass
     393              :          * it down.
     394              :          */
     395           32 :         if (stmt->usingindex)
     396              :         {
     397              :             /*
     398              :              * If no index name was specified when repacking a partitioned
     399              :              * table, punt for now.  Maybe we can improve this later.
     400              :              */
     401           28 :             if (!stmt->indexname)
     402              :             {
     403            8 :                 if (stmt->command == REPACK_COMMAND_CLUSTER)
     404            4 :                     ereport(ERROR,
     405              :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     406              :                             errmsg("there is no previously clustered index for table \"%s\"",
     407              :                                    RelationGetRelationName(rel)));
     408              :                 else
     409            4 :                     ereport(ERROR,
     410              :                             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     411              :                     /*- translator: first %s is name of a SQL command, eg. REPACK */
     412              :                             errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
     413              :                                    RepackCommandAsString(stmt->command),
     414              :                                    RelationGetRelationName(rel)));
     415              :             }
     416              : 
     417           20 :             relid = determine_clustered_index(rel, stmt->usingindex,
     418           20 :                                               stmt->indexname);
     419           20 :             if (!OidIsValid(relid))
     420            0 :                 elog(ERROR, "unable to determine index to cluster on");
     421           20 :             check_index_is_clusterable(rel, relid, AccessExclusiveLock);
     422              : 
     423           16 :             rel_is_index = true;
     424              :         }
     425              :         else
     426              :         {
     427            4 :             relid = RelationGetRelid(rel);
     428            4 :             rel_is_index = false;
     429              :         }
     430              : 
     431           20 :         rtcs = get_tables_to_repack_partitioned(stmt->command,
     432              :                                                 relid, rel_is_index,
     433              :                                                 repack_context);
     434              : 
     435              :         /* close parent relation, releasing lock on it */
     436           20 :         table_close(rel, AccessExclusiveLock);
     437           20 :         rel = NULL;
     438              :     }
     439              : 
     440              :     /* Commit to get out of starting transaction */
     441           36 :     PopActiveSnapshot();
     442           36 :     CommitTransactionCommand();
     443              : 
     444              :     /* Cluster the tables, each in a separate transaction */
     445              :     Assert(rel == NULL);
     446          124 :     foreach_ptr(RelToCluster, rtc, rtcs)
     447              :     {
     448              :         /* Start a new transaction for each relation. */
     449           52 :         StartTransactionCommand();
     450              : 
     451              :         /*
     452              :          * Open the target table, coping with the case where it has been
     453              :          * dropped.
     454              :          */
     455           52 :         rel = try_table_open(rtc->tableOid, lockmode);
     456           52 :         if (rel == NULL)
     457              :         {
     458            0 :             CommitTransactionCommand();
     459            0 :             continue;
     460              :         }
     461              : 
     462              :         /* functions in indexes may want a snapshot set */
     463           52 :         PushActiveSnapshot(GetTransactionSnapshot());
     464              : 
     465              :         /* Process this table */
     466           52 :         cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
     467              :         /* cluster_rel closes the relation, but keeps lock */
     468              : 
     469           52 :         PopActiveSnapshot();
     470           52 :         CommitTransactionCommand();
     471              :     }
     472              : 
     473              :     /* Start a new transaction for the cleanup work. */
     474           36 :     StartTransactionCommand();
     475              : 
     476              :     /* Clean up working storage */
     477           36 :     MemoryContextDelete(repack_context);
     478              : }
     479              : 
     480              : /*
     481              :  * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
     482              :  * operation to avoid any lock-upgrade hazards.  In the concurrent case, we
     483              :  * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
     484              :  * processing and only acquire AccessExclusiveLock at the end, to swap the
     485              :  * relation -- supposedly for a short time.
     486              :  */
     487              : static LOCKMODE
     488         1072 : RepackLockLevel(bool concurrent)
     489              : {
     490         1072 :     if (concurrent)
     491           36 :         return ShareUpdateExclusiveLock;
     492              :     else
     493         1036 :         return AccessExclusiveLock;
     494              : }
     495              : 
     496              : /*
     497              :  * cluster_rel
     498              :  *
     499              :  * This clusters the table by creating a new, clustered table and
     500              :  * swapping the relfilenumbers of the new table and the old table, so
     501              :  * the OID of the original table is preserved.  Thus we do not lose
     502              :  * GRANT, inheritance nor references to this table.
     503              :  *
     504              :  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
     505              :  * the new table, it's better to create the indexes afterwards than to fill
     506              :  * them incrementally while we load the table.
     507              :  *
     508              :  * If indexOid is InvalidOid, the table will be rewritten in physical order
     509              :  * instead of index order.
     510              :  *
     511              :  * Note that, in the concurrent case, the function releases the lock at some
     512              :  * point, in order to get AccessExclusiveLock for the final steps (i.e. to
     513              :  * swap the relation files). To make things simpler, the caller should expect
     514              :  * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
     515              :  * AccessExclusiveLock is kept till the end of the transaction.)
     516              :  *
     517              :  * 'cmd' indicates which command is being executed, to be used for error
     518              :  * messages.
     519              :  */
     520              : void
     521          429 : cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
     522              :             ClusterParams *params, bool isTopLevel)
     523              : {
     524          429 :     Oid         tableOid = RelationGetRelid(OldHeap);
     525              :     Relation    index;
     526              :     LOCKMODE    lmode;
     527              :     Oid         save_userid;
     528              :     int         save_sec_context;
     529              :     int         save_nestlevel;
     530          429 :     bool        verbose = ((params->options & CLUOPT_VERBOSE) != 0);
     531          429 :     bool        recheck = ((params->options & CLUOPT_RECHECK) != 0);
     532          429 :     bool        concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
     533          429 :     Oid         ident_idx = InvalidOid;
     534              : 
     535              :     /* Determine the lock mode to use. */
     536          429 :     lmode = RepackLockLevel(concurrent);
     537              : 
     538              :     /*
     539              :      * Check some preconditions in the concurrent case.  This also obtains the
     540              :      * replica index OID.
     541              :      */
     542          429 :     if (concurrent)
     543           14 :         check_concurrent_repack_requirements(OldHeap, &ident_idx);
     544              : 
     545              :     /* Check for user-requested abort. */
     546          422 :     CHECK_FOR_INTERRUPTS();
     547              : 
     548          422 :     pgstat_progress_start_command(PROGRESS_COMMAND_REPACK, tableOid);
     549          422 :     pgstat_progress_update_param(PROGRESS_REPACK_COMMAND, cmd);
     550              : 
     551              :     /*
     552              :      * Switch to the table owner's userid, so that any index functions are run
     553              :      * as that user.  Also lock down security-restricted operations and
     554              :      * arrange to make GUC variable changes local to this command.
     555              :      */
     556          422 :     GetUserIdAndSecContext(&save_userid, &save_sec_context);
     557          422 :     SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
     558              :                            save_sec_context | SECURITY_RESTRICTED_OPERATION);
     559          422 :     save_nestlevel = NewGUCNestLevel();
     560          422 :     RestrictSearchPath();
     561              : 
     562              :     /*
     563              :      * Recheck that the relation is still what it was when we started.
     564              :      *
     565              :      * Note that it's critical to skip this in single-relation CLUSTER;
     566              :      * otherwise, we would reject an attempt to cluster using a
     567              :      * not-previously-clustered index.
     568              :      */
     569          422 :     if (recheck &&
     570           52 :         !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
     571           52 :                              lmode, params->options))
     572            0 :         goto out;
     573              : 
     574              :     /*
     575              :      * We allow repacking shared catalogs only when not using an index. It
     576              :      * would work to use an index in most respects, but the index would only
     577              :      * get marked as indisclustered in the current database, leading to
     578              :      * unexpected behavior if CLUSTER were later invoked in another database.
     579              :      */
     580          422 :     if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
     581            0 :         ereport(ERROR,
     582              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     583              :         /*- translator: first %s is name of a SQL command, eg. REPACK */
     584              :                 errmsg("cannot execute %s on a shared catalog",
     585              :                        RepackCommandAsString(cmd)));
     586              : 
     587              :     /*
     588              :      * The CONCURRENTLY case should have been rejected earlier because it does
     589              :      * not support system catalogs.
     590              :      */
     591              :     Assert(!(OldHeap->rd_rel->relisshared && concurrent));
     592              : 
     593              :     /*
     594              :      * Don't process temp tables of other backends ... their local buffer
     595              :      * manager is not going to cope.
     596              :      */
     597          422 :     if (RELATION_IS_OTHER_TEMP(OldHeap))
     598            0 :         ereport(ERROR,
     599              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     600              :         /*- translator: first %s is name of a SQL command, eg. REPACK */
     601              :                 errmsg("cannot execute %s on temporary tables of other sessions",
     602              :                        RepackCommandAsString(cmd)));
     603              : 
     604              :     /*
     605              :      * Also check for active uses of the relation in the current transaction,
     606              :      * including open scans and pending AFTER trigger events.
     607              :      */
     608          422 :     CheckTableNotInUse(OldHeap, RepackCommandAsString(cmd));
     609              : 
     610              :     /* Check heap and index are valid to cluster on */
     611          422 :     if (OidIsValid(indexOid))
     612              :     {
     613              :         /* verify the index is good and lock it */
     614          160 :         check_index_is_clusterable(OldHeap, indexOid, lmode);
     615              :         /* also open it */
     616          160 :         index = index_open(indexOid, NoLock);
     617              :     }
     618              :     else
     619          262 :         index = NULL;
     620              : 
     621              :     /*
     622              :      * When allow_system_table_mods is turned off, we disallow repacking a
     623              :      * catalog on a particular index unless that's already the clustered index
     624              :      * for that catalog.
     625              :      *
     626              :      * XXX We don't check for this in CLUSTER, because it's historically been
     627              :      * allowed.
     628              :      */
     629          422 :     if (cmd != REPACK_COMMAND_CLUSTER &&
     630          289 :         !allowSystemTableMods && OidIsValid(indexOid) &&
     631           27 :         IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
     632            4 :         ereport(ERROR,
     633              :                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     634              :                 errmsg("permission denied: \"%s\" is a system catalog",
     635              :                        RelationGetRelationName(OldHeap)),
     636              :                 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
     637              :                           "allow_system_table_mods"));
     638              : 
     639              :     /*
     640              :      * Quietly ignore the request if this is a materialized view which has not
     641              :      * been populated from its query. No harm is done because there is no data
     642              :      * to deal with, and we don't want to throw an error if this is part of a
     643              :      * multi-relation request -- for example, CLUSTER was run on the entire
     644              :      * database.
     645              :      */
     646          418 :     if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
     647            8 :         !RelationIsPopulated(OldHeap))
     648              :     {
     649            8 :         if (index)
     650            8 :             index_close(index, lmode);
     651            8 :         relation_close(OldHeap, lmode);
     652            8 :         goto out;
     653              :     }
     654              : 
     655              :     Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
     656              :            OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
     657              :            OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
     658              : 
     659              :     /*
     660              :      * All predicate locks on the tuples or pages are about to be made
     661              :      * invalid, because we move tuples around.  Promote them to relation
     662              :      * locks.  Predicate locks on indexes will be promoted when they are
     663              :      * reindexed.
     664              :      *
     665              :      * During concurrent processing, the heap as well as its indexes stay in
     666              :      * operation, so we postpone this step until they are locked using
     667              :      * AccessExclusiveLock near the end of the processing.
     668              :      */
     669          410 :     if (!concurrent)
     670          403 :         TransferPredicateLocksToHeapRelation(OldHeap);
     671              : 
     672              :     /*
     673              :      * rebuild_relation does all the dirty work, and closes OldHeap and index,
     674              :      * if valid.
     675              :      *
     676              :      * In concurrent mode, make sure the worker terminates; normally it does
     677              :      * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
     678              :      * happens even in case this backend dies early on a FATAL exit.  Normal
     679              :      * mode doesn't need that overhead.
     680              :      */
     681          410 :     if (concurrent)
     682              :     {
     683            7 :         PG_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
     684              :         {
     685            7 :             rebuild_relation(OldHeap, index, verbose, ident_idx);
     686              :         }
     687            7 :         PG_END_ENSURE_ERROR_CLEANUP(stop_repack_decoding_worker_cb, 0);
     688            7 :         stop_repack_decoding_worker();
     689              :     }
     690              :     else
     691          403 :         rebuild_relation(OldHeap, index, verbose, ident_idx);
     692              : 
     693          414 : out:
     694              :     /* Roll back any GUC changes executed by index functions */
     695          414 :     AtEOXact_GUC(false, save_nestlevel);
     696              : 
     697              :     /* Restore userid and security context */
     698          414 :     SetUserIdAndSecContext(save_userid, save_sec_context);
     699              : 
     700          414 :     pgstat_progress_end_command();
     701          414 : }
     702              : 
     703              : /*
     704              :  * Check if the table (and its index) still meets the requirements of
     705              :  * cluster_rel().
     706              :  */
     707              : static bool
     708           52 : cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid,
     709              :                     Oid userid, LOCKMODE lmode, int options)
     710              : {
     711           52 :     Oid         tableOid = RelationGetRelid(OldHeap);
     712              : 
     713              :     /* Check that the user still has privileges for the relation */
     714           52 :     if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
     715              :     {
     716            0 :         relation_close(OldHeap, lmode);
     717            0 :         return false;
     718              :     }
     719              : 
     720              :     /*
     721              :      * Silently skip a temp table for a remote session.  Only doing this check
     722              :      * in the "recheck" case is appropriate (which currently means somebody is
     723              :      * executing a database-wide CLUSTER or on a partitioned table), because
     724              :      * there is another check in cluster() which will stop any attempt to
     725              :      * cluster remote temp tables by name.  There is another check in
     726              :      * cluster_rel which is redundant, but we leave it for extra safety.
     727              :      */
     728           52 :     if (RELATION_IS_OTHER_TEMP(OldHeap))
     729              :     {
     730            0 :         relation_close(OldHeap, lmode);
     731            0 :         return false;
     732              :     }
     733              : 
     734           52 :     if (OidIsValid(indexOid))
     735              :     {
     736              :         /*
     737              :          * Check that the index still exists
     738              :          */
     739           32 :         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
     740              :         {
     741            0 :             relation_close(OldHeap, lmode);
     742            0 :             return false;
     743              :         }
     744              : 
     745              :         /*
     746              :          * Check that the index is still the one with indisclustered set, if
     747              :          * needed.
     748              :          */
     749           32 :         if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
     750            4 :             !get_index_isclustered(indexOid))
     751              :         {
     752            0 :             relation_close(OldHeap, lmode);
     753            0 :             return false;
     754              :         }
     755              :     }
     756              : 
     757           52 :     return true;
     758              : }
     759              : 
     760              : /*
     761              :  * Verify that the specified heap and index are valid to cluster on
     762              :  *
     763              :  * Side effect: obtains lock on the index.  The caller may
     764              :  * in some cases already have a lock of the same strength on the table, but
     765              :  * not in all cases so we can't rely on the table-level lock for
     766              :  * protection here.
     767              :  */
     768              : void
     769          359 : check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
     770              : {
     771              :     Relation    OldIndex;
     772              : 
     773          359 :     OldIndex = index_open(indexOid, lockmode);
     774              : 
     775              :     /*
     776              :      * Check that index is in fact an index on the given relation
     777              :      */
     778          359 :     if (OldIndex->rd_index == NULL ||
     779          359 :         OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
     780            4 :         ereport(ERROR,
     781              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     782              :                  errmsg("\"%s\" is not an index for table \"%s\"",
     783              :                         RelationGetRelationName(OldIndex),
     784              :                         RelationGetRelationName(OldHeap))));
     785              : 
     786              :     /* Index AM must allow clustering */
     787          355 :     if (!OldIndex->rd_indam->amclusterable)
     788            4 :         ereport(ERROR,
     789              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     790              :                  errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
     791              :                         RelationGetRelationName(OldIndex))));
     792              : 
     793              :     /*
     794              :      * Disallow clustering on incomplete indexes (those that might not index
     795              :      * every row of the relation).  We could relax this by making a separate
     796              :      * seqscan pass over the table to copy the missing rows, but that seems
     797              :      * expensive and tedious.
     798              :      */
     799          351 :     if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
     800            4 :         ereport(ERROR,
     801              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     802              :                  errmsg("cannot cluster on partial index \"%s\"",
     803              :                         RelationGetRelationName(OldIndex))));
     804              : 
     805              :     /*
     806              :      * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
     807              :      * it might well not contain entries for every heap row, or might not even
     808              :      * be internally consistent.  (But note that we don't check indcheckxmin;
     809              :      * the worst consequence of following broken HOT chains would be that we
     810              :      * might put recently-dead tuples out-of-order in the new table, and there
     811              :      * is little harm in that.)
     812              :      */
     813          347 :     if (!OldIndex->rd_index->indisvalid)
     814            4 :         ereport(ERROR,
     815              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     816              :                  errmsg("cannot cluster on invalid index \"%s\"",
     817              :                         RelationGetRelationName(OldIndex))));
     818              : 
     819              :     /* Drop relcache refcnt on OldIndex, but keep lock */
     820          343 :     index_close(OldIndex, NoLock);
     821          343 : }
     822              : 
     823              : /*
     824              :  * mark_index_clustered: mark the specified index as the one clustered on
     825              :  *
     826              :  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
     827              :  */
     828              : void
     829          195 : mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
     830              : {
     831              :     HeapTuple   indexTuple;
     832              :     Form_pg_index indexForm;
     833              :     Relation    pg_index;
     834              :     ListCell   *index;
     835              : 
     836              :     Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
     837              : 
     838              :     /*
     839              :      * If the index is already marked clustered, no need to do anything.
     840              :      */
     841          195 :     if (OidIsValid(indexOid))
     842              :     {
     843          187 :         if (get_index_isclustered(indexOid))
     844           40 :             return;
     845              :     }
     846              : 
     847              :     /*
     848              :      * Check each index of the relation and set/clear the bit as needed.
     849              :      */
     850          155 :     pg_index = table_open(IndexRelationId, RowExclusiveLock);
     851              : 
     852          464 :     foreach(index, RelationGetIndexList(rel))
     853              :     {
     854          309 :         Oid         thisIndexOid = lfirst_oid(index);
     855              : 
     856          309 :         indexTuple = SearchSysCacheCopy1(INDEXRELID,
     857              :                                          ObjectIdGetDatum(thisIndexOid));
     858          309 :         if (!HeapTupleIsValid(indexTuple))
     859            0 :             elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
     860          309 :         indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
     861              : 
     862              :         /*
     863              :          * Unset the bit if set.  We know it's wrong because we checked this
     864              :          * earlier.
     865              :          */
     866          309 :         if (indexForm->indisclustered)
     867              :         {
     868           20 :             indexForm->indisclustered = false;
     869           20 :             CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
     870              :         }
     871          289 :         else if (thisIndexOid == indexOid)
     872              :         {
     873              :             /* this was checked earlier, but let's be real sure */
     874          147 :             if (!indexForm->indisvalid)
     875            0 :                 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
     876          147 :             indexForm->indisclustered = true;
     877          147 :             CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
     878              :         }
     879              : 
     880          309 :         InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
     881              :                                      InvalidOid, is_internal);
     882              : 
     883          309 :         heap_freetuple(indexTuple);
     884              :     }
     885              : 
     886          155 :     table_close(pg_index, RowExclusiveLock);
     887              : }
     888              : 
     889              : /*
     890              :  * Check if the CONCURRENTLY option is legal for the relation.
     891              :  *
     892              :  * *Ident_idx_p receives OID of the identity index.
     893              :  */
     894              : static void
     895           14 : check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
     896              : {
     897              :     char        relpersistence,
     898              :                 replident;
     899              :     Oid         ident_idx;
     900              : 
     901           14 :     if (wal_level < WAL_LEVEL_REPLICA)
     902            0 :         ereport(ERROR,
     903              :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     904              :                 errmsg("cannot execute %s in this configuration",
     905              :                        "REPACK (CONCURRENTLY)"),
     906              :                 errdetail("%s requires \"wal_level\" to be set to \"replica\" or higher.",
     907              :                           "REPACK (CONCURRENTLY)"));
     908              : 
     909              :     /* Data changes in system relations are not logically decoded. */
     910           14 :     if (IsCatalogRelation(rel))
     911            1 :         ereport(ERROR,
     912              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     913              :                 errmsg("cannot execute %s on relation \"%s\"",
     914              :                        "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
     915              :                 errhint("%s is not supported for catalog relations.",
     916              :                         "REPACK (CONCURRENTLY)"));
     917              : 
     918              :     /*
     919              :      * reorderbuffer.c does not seem to handle processing of TOAST relation
     920              :      * alone.
     921              :      */
     922           13 :     if (IsToastRelation(rel))
     923            1 :         ereport(ERROR,
     924              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     925              :                 errmsg("cannot execute %s on relation \"%s\"",
     926              :                        "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
     927              :                 errhint("%s is not supported for TOAST relations.",
     928              :                         "REPACK (CONCURRENTLY)"));
     929              : 
     930           12 :     relpersistence = rel->rd_rel->relpersistence;
     931           12 :     if (relpersistence != RELPERSISTENCE_PERMANENT)
     932            2 :         ereport(ERROR,
     933              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     934              :                 errmsg("cannot execute %s on relation \"%s\"",
     935              :                        "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
     936              :                 errhint("%s is only allowed for permanent relations.",
     937              :                         "REPACK (CONCURRENTLY)"));
     938              : 
     939              :     /*
     940              :      * With NOTHING, WAL does not contain the old tuple; FULL is not yet
     941              :      * supported.
     942              :      */
     943           10 :     replident = rel->rd_rel->relreplident;
     944           10 :     if (replident == REPLICA_IDENTITY_NOTHING ||
     945              :         replident == REPLICA_IDENTITY_FULL)
     946            1 :         ereport(ERROR,
     947              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     948              :                 errmsg("cannot execute %s on relation \"%s\"",
     949              :                        "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
     950              :                 errdetail("%s does not support tables with %s.",
     951              :                           "REPACK (CONCURRENTLY)",
     952              :                           replident == REPLICA_IDENTITY_NOTHING ?
     953              :                           "REPLICA IDENTITY NOTHING" : "REPLICA IDENTITY FULL"));
     954              : 
     955              :     /*
     956              :      * Obtain the replica identity index -- either one that has been set
     957              :      * explicitly, or a non-deferrable primary key.  If none of these cases
     958              :      * apply, the table cannot be repacked concurrently.  It might be possible
     959              :      * to have repack work with a FULL replica identity; however that requires
     960              :      * more work and is not implemented yet.
     961              :      */
     962            9 :     ident_idx = GetRelationIdentityOrPK(rel);
     963            9 :     if (!OidIsValid(ident_idx))
     964              :     {
     965              :         /* This special case warrants its own error message */
     966            2 :         if (OidIsValid(rel->rd_pkindex) && rel->rd_ispkdeferrable)
     967            1 :             ereport(ERROR,
     968              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     969              :                     errmsg("cannot execute %s on relation \"%s\"",
     970              :                            "REPACK (CONCURRENTLY)",
     971              :                            RelationGetRelationName(rel)),
     972              :                     errdetail("%s does not support deferrable primary keys.",
     973              :                               "REPACK (CONCURRENTLY)"),
     974              :                     errhint("Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity."));
     975              : 
     976            1 :         ereport(ERROR,
     977              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     978              :                 errmsg("cannot execute %s on relation \"%s\"",
     979              :                        "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
     980              :                 errhint("Relation \"%s\" has no identity index.",
     981              :                         RelationGetRelationName(rel)));
     982              :     }
     983              : 
     984            7 :     *ident_idx_p = ident_idx;
     985            7 : }
     986              : 
     987              : 
     988              : /*
     989              :  * rebuild_relation: rebuild an existing relation in index or physical order
     990              :  *
     991              :  * OldHeap: table to rebuild.  See cluster_rel() for comments on the required
     992              :  * lock strength.
     993              :  *
     994              :  * index: index to cluster by, or NULL to rewrite in physical order.
     995              :  *
     996              :  * ident_idx: identity index, to handle replaying of concurrent data changes
     997              :  * to the new heap. InvalidOid if there's no CONCURRENTLY option.
     998              :  *
     999              :  * On entry, heap and index (if one is given) must be open, and the
    1000              :  * appropriate lock held on them -- AccessExclusiveLock for exclusive
    1001              :  * processing and ShareUpdateExclusiveLock for concurrent processing.
    1002              :  *
    1003              :  * On exit, they are closed, but still locked with AccessExclusiveLock.
    1004              :  * (The function handles the lock upgrade if 'concurrent' is true.)
    1005              :  */
    1006              : static void
    1007          410 : rebuild_relation(Relation OldHeap, Relation index, bool verbose,
    1008              :                  Oid ident_idx)
    1009              : {
    1010          410 :     Oid         tableOid = RelationGetRelid(OldHeap);
    1011          410 :     Oid         accessMethod = OldHeap->rd_rel->relam;
    1012          410 :     Oid         tableSpace = OldHeap->rd_rel->reltablespace;
    1013              :     Oid         OIDNewHeap;
    1014              :     Relation    NewHeap;
    1015              :     char        relpersistence;
    1016              :     bool        swap_toast_by_content;
    1017              :     TransactionId frozenXid;
    1018              :     MultiXactId cutoffMulti;
    1019          410 :     bool        concurrent = OidIsValid(ident_idx);
    1020          410 :     Snapshot    snapshot = NULL;
    1021              : #if USE_ASSERT_CHECKING
    1022              :     LOCKMODE    lmode;
    1023              : 
    1024              :     lmode = RepackLockLevel(concurrent);
    1025              : 
    1026              :     Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
    1027              :     Assert(index == NULL || CheckRelationLockedByMe(index, lmode, false));
    1028              : #endif
    1029              : 
    1030          410 :     if (concurrent)
    1031              :     {
    1032              :         /*
    1033              :          * The worker needs to be member of the locking group we're the leader
    1034              :          * of. We ought to become the leader before the worker starts. The
    1035              :          * worker will join the group as soon as it starts.
    1036              :          *
    1037              :          * This is to make sure that the deadlock described below is
    1038              :          * detectable by deadlock.c: if the worker waits for a transaction to
    1039              :          * complete and we are waiting for the worker output, then effectively
    1040              :          * we (i.e. this backend) are waiting for that transaction.
    1041              :          */
    1042            7 :         BecomeLockGroupLeader();
    1043              : 
    1044              :         /*
    1045              :          * Start the worker that decodes data changes applied while we're
    1046              :          * copying the table contents.
    1047              :          *
    1048              :          * Note that the worker has to wait for all transactions with XID
    1049              :          * already assigned to finish. If some of those transactions is
    1050              :          * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
    1051              :          * table (e.g.  it runs CREATE INDEX), we can end up in a deadlock.
    1052              :          * Not sure this risk is worth unlocking/locking the table (and its
    1053              :          * clustering index) and checking again if it's still eligible for
    1054              :          * REPACK CONCURRENTLY.
    1055              :          */
    1056            7 :         start_repack_decoding_worker(tableOid);
    1057              : 
    1058              :         /*
    1059              :          * Wait until the worker has the initial snapshot and retrieve it.
    1060              :          */
    1061            7 :         snapshot = get_initial_snapshot(decoding_worker);
    1062              : 
    1063            7 :         PushActiveSnapshot(snapshot);
    1064              :     }
    1065              : 
    1066              :     /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
    1067          410 :     if (index != NULL)
    1068          148 :         mark_index_clustered(OldHeap, RelationGetRelid(index), true);
    1069              : 
    1070              :     /* Remember info about rel before closing OldHeap */
    1071          410 :     relpersistence = OldHeap->rd_rel->relpersistence;
    1072              : 
    1073              :     /*
    1074              :      * Create the transient table that will receive the re-ordered data.
    1075              :      *
    1076              :      * OldHeap is already locked, so no need to lock it again.  make_new_heap
    1077              :      * obtains AccessExclusiveLock on the new heap and its toast table.
    1078              :      */
    1079          410 :     OIDNewHeap = make_new_heap(tableOid, tableSpace,
    1080              :                                accessMethod,
    1081              :                                relpersistence,
    1082              :                                NoLock);
    1083              :     Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
    1084          410 :     NewHeap = table_open(OIDNewHeap, NoLock);
    1085              : 
    1086              :     /* Copy the heap data into the new table in the desired order */
    1087          410 :     copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
    1088              :                     &swap_toast_by_content, &frozenXid, &cutoffMulti);
    1089              : 
    1090              :     /* The historic snapshot won't be needed anymore. */
    1091          410 :     if (snapshot)
    1092              :     {
    1093            7 :         PopActiveSnapshot();
    1094            7 :         UpdateActiveSnapshotCommandId();
    1095              :     }
    1096              : 
    1097          410 :     if (concurrent)
    1098              :     {
    1099              :         Assert(!swap_toast_by_content);
    1100              : 
    1101              :         /*
    1102              :          * Close the index, but keep the lock. Both heaps will be closed by
    1103              :          * the following call.
    1104              :          */
    1105            7 :         if (index)
    1106            3 :             index_close(index, NoLock);
    1107              : 
    1108            7 :         rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx,
    1109              :                                            frozenXid, cutoffMulti);
    1110              : 
    1111            7 :         pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    1112              :                                      PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
    1113              :     }
    1114              :     else
    1115              :     {
    1116          403 :         bool        is_system_catalog = IsSystemRelation(OldHeap);
    1117              : 
    1118              :         /* Close relcache entries, but keep lock until transaction commit */
    1119          403 :         table_close(OldHeap, NoLock);
    1120          403 :         if (index)
    1121          145 :             index_close(index, NoLock);
    1122              : 
    1123              :         /*
    1124              :          * Close the new relation so it can be dropped as soon as the storage
    1125              :          * is swapped. The relation is not visible to others, so no need to
    1126              :          * unlock it explicitly.
    1127              :          */
    1128          403 :         table_close(NewHeap, NoLock);
    1129              : 
    1130              :         /*
    1131              :          * Swap the physical files of the target and transient tables, then
    1132              :          * rebuild the target's indexes and throw away the transient table.
    1133              :          */
    1134          403 :         finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
    1135              :                          swap_toast_by_content, false, true,
    1136              :                          true,  /* reindex */
    1137              :                          frozenXid, cutoffMulti,
    1138              :                          relpersistence);
    1139              :     }
    1140          406 : }
    1141              : 
    1142              : 
    1143              : /*
    1144              :  * Create the transient table that will be filled with new data during
    1145              :  * CLUSTER, ALTER TABLE, and similar operations.  The transient table
    1146              :  * duplicates the logical structure of the OldHeap; but will have the
    1147              :  * specified physical storage properties NewTableSpace, NewAccessMethod, and
    1148              :  * relpersistence.
    1149              :  *
    1150              :  * After this, the caller should load the new heap with transferred/modified
    1151              :  * data, then call finish_heap_swap to complete the operation.
    1152              :  */
    1153              : Oid
    1154         1559 : make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
    1155              :               char relpersistence, LOCKMODE lockmode)
    1156              : {
    1157              :     TupleDesc   OldHeapDesc;
    1158              :     char        NewHeapName[NAMEDATALEN];
    1159              :     Oid         OIDNewHeap;
    1160              :     Oid         toastid;
    1161              :     Relation    OldHeap;
    1162              :     HeapTuple   tuple;
    1163              :     Datum       reloptions;
    1164              :     bool        isNull;
    1165              :     Oid         namespaceid;
    1166              : 
    1167         1559 :     OldHeap = table_open(OIDOldHeap, lockmode);
    1168         1559 :     OldHeapDesc = RelationGetDescr(OldHeap);
    1169              : 
    1170              :     /*
    1171              :      * Note that the NewHeap will not receive any of the defaults or
    1172              :      * constraints associated with the OldHeap; we don't need 'em, and there's
    1173              :      * no reason to spend cycles inserting them into the catalogs only to
    1174              :      * delete them.
    1175              :      */
    1176              : 
    1177              :     /*
    1178              :      * But we do want to use reloptions of the old heap for new heap.
    1179              :      */
    1180         1559 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
    1181         1559 :     if (!HeapTupleIsValid(tuple))
    1182            0 :         elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
    1183         1559 :     reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
    1184              :                                  &isNull);
    1185         1559 :     if (isNull)
    1186         1466 :         reloptions = (Datum) 0;
    1187              : 
    1188         1559 :     if (relpersistence == RELPERSISTENCE_TEMP)
    1189           98 :         namespaceid = LookupCreationNamespace("pg_temp");
    1190              :     else
    1191         1461 :         namespaceid = RelationGetNamespace(OldHeap);
    1192              : 
    1193              :     /*
    1194              :      * Create the new heap, using a temporary name in the same namespace as
    1195              :      * the existing table.  NOTE: there is some risk of collision with user
    1196              :      * relnames.  Working around this seems more trouble than it's worth; in
    1197              :      * particular, we can't create the new heap in a different namespace from
    1198              :      * the old, or we will have problems with the TEMP status of temp tables.
    1199              :      *
    1200              :      * Note: the new heap is not a shared relation, even if we are rebuilding
    1201              :      * a shared rel.  However, we do make the new heap mapped if the source is
    1202              :      * mapped.  This simplifies swap_relation_files, and is absolutely
    1203              :      * necessary for rebuilding pg_class, for reasons explained there.
    1204              :      */
    1205         1559 :     snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
    1206              : 
    1207         1559 :     OIDNewHeap = heap_create_with_catalog(NewHeapName,
    1208              :                                           namespaceid,
    1209              :                                           NewTableSpace,
    1210              :                                           InvalidOid,
    1211              :                                           InvalidOid,
    1212              :                                           InvalidOid,
    1213         1559 :                                           OldHeap->rd_rel->relowner,
    1214              :                                           NewAccessMethod,
    1215              :                                           OldHeapDesc,
    1216              :                                           NIL,
    1217              :                                           RELKIND_RELATION,
    1218              :                                           relpersistence,
    1219              :                                           false,
    1220         1559 :                                           RelationIsMapped(OldHeap),
    1221              :                                           ONCOMMIT_NOOP,
    1222              :                                           reloptions,
    1223              :                                           false,
    1224              :                                           true,
    1225              :                                           true,
    1226              :                                           OIDOldHeap,
    1227         1559 :                                           NULL);
    1228              :     Assert(OIDNewHeap != InvalidOid);
    1229              : 
    1230         1559 :     ReleaseSysCache(tuple);
    1231              : 
    1232              :     /*
    1233              :      * Advance command counter so that the newly-created relation's catalog
    1234              :      * tuples will be visible to table_open.
    1235              :      */
    1236         1559 :     CommandCounterIncrement();
    1237              : 
    1238              :     /*
    1239              :      * If necessary, create a TOAST table for the new relation.
    1240              :      *
    1241              :      * If the relation doesn't have a TOAST table already, we can't need one
    1242              :      * for the new relation.  The other way around is possible though: if some
    1243              :      * wide columns have been dropped, NewHeapCreateToastTable can decide that
    1244              :      * no TOAST table is needed for the new table.
    1245              :      *
    1246              :      * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
    1247              :      * that the TOAST table will be visible for insertion.
    1248              :      */
    1249         1559 :     toastid = OldHeap->rd_rel->reltoastrelid;
    1250         1559 :     if (OidIsValid(toastid))
    1251              :     {
    1252              :         /* keep the existing toast table's reloptions, if any */
    1253          571 :         tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
    1254          571 :         if (!HeapTupleIsValid(tuple))
    1255            0 :             elog(ERROR, "cache lookup failed for relation %u", toastid);
    1256          571 :         reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
    1257              :                                      &isNull);
    1258          571 :         if (isNull)
    1259          571 :             reloptions = (Datum) 0;
    1260              : 
    1261          571 :         NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
    1262              : 
    1263          571 :         ReleaseSysCache(tuple);
    1264              :     }
    1265              : 
    1266         1559 :     table_close(OldHeap, NoLock);
    1267              : 
    1268         1559 :     return OIDNewHeap;
    1269              : }
    1270              : 
    1271              : /*
    1272              :  * Do the physical copying of table data.
    1273              :  *
    1274              :  * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
    1275              :  * iff concurrent processing is required.
    1276              :  *
    1277              :  * There are three output parameters:
    1278              :  * *pSwapToastByContent is set true if toast tables must be swapped by content.
    1279              :  * *pFreezeXid receives the TransactionId used as freeze cutoff point.
    1280              :  * *pCutoffMulti receives the MultiXactId used as a cutoff point.
    1281              :  */
    1282              : static void
    1283          410 : copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
    1284              :                 Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
    1285              :                 TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
    1286              : {
    1287              :     Relation    relRelation;
    1288              :     HeapTuple   reltup;
    1289              :     Form_pg_class relform;
    1290              :     TupleDesc   oldTupDesc PG_USED_FOR_ASSERTS_ONLY;
    1291              :     TupleDesc   newTupDesc PG_USED_FOR_ASSERTS_ONLY;
    1292              :     VacuumParams params;
    1293              :     struct VacuumCutoffs cutoffs;
    1294              :     bool        use_sort;
    1295          410 :     double      num_tuples = 0,
    1296          410 :                 tups_vacuumed = 0,
    1297          410 :                 tups_recently_dead = 0;
    1298              :     BlockNumber num_pages;
    1299          410 :     int         elevel = verbose ? INFO : DEBUG2;
    1300              :     PGRUsage    ru0;
    1301              :     char       *nspname;
    1302          410 :     bool        concurrent = snapshot != NULL;
    1303              :     LOCKMODE    lmode;
    1304              : 
    1305          410 :     lmode = RepackLockLevel(concurrent);
    1306              : 
    1307          410 :     pg_rusage_init(&ru0);
    1308              : 
    1309              :     /* Store a copy of the namespace name for logging purposes */
    1310          410 :     nspname = get_namespace_name(RelationGetNamespace(OldHeap));
    1311              : 
    1312              :     /*
    1313              :      * Their tuple descriptors should be exactly alike, but here we only need
    1314              :      * assume that they have the same number of columns.
    1315              :      */
    1316          410 :     oldTupDesc = RelationGetDescr(OldHeap);
    1317          410 :     newTupDesc = RelationGetDescr(NewHeap);
    1318              :     Assert(newTupDesc->natts == oldTupDesc->natts);
    1319              : 
    1320              :     /*
    1321              :      * If the OldHeap has a toast table, get lock on the toast table to keep
    1322              :      * it from being vacuumed.  This is needed because autovacuum processes
    1323              :      * toast tables independently of their main tables, with no lock on the
    1324              :      * latter.  If an autovacuum were to start on the toast table after we
    1325              :      * compute our OldestXmin below, it would use a later OldestXmin, and then
    1326              :      * possibly remove as DEAD toast tuples belonging to main tuples we think
    1327              :      * are only RECENTLY_DEAD.  Then we'd fail while trying to copy those
    1328              :      * tuples.
    1329              :      *
    1330              :      * We don't need to open the toast relation here, just lock it.  The lock
    1331              :      * will be held till end of transaction.
    1332              :      */
    1333          410 :     if (OldHeap->rd_rel->reltoastrelid)
    1334          135 :         LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
    1335              : 
    1336              :     /*
    1337              :      * If both tables have TOAST tables, perform toast swap by content.  It is
    1338              :      * possible that the old table has a toast table but the new one doesn't,
    1339              :      * if toastable columns have been dropped.  In that case we have to do
    1340              :      * swap by links.  This is okay because swap by content is only essential
    1341              :      * for system catalogs, and we don't support schema changes for them.
    1342              :      */
    1343          410 :     if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
    1344          135 :         !concurrent)
    1345              :     {
    1346          132 :         *pSwapToastByContent = true;
    1347              : 
    1348              :         /*
    1349              :          * When doing swap by content, any toast pointers written into NewHeap
    1350              :          * must use the old toast table's OID, because that's where the toast
    1351              :          * data will eventually be found.  Set this up by setting rd_toastoid.
    1352              :          * This also tells toast_save_datum() to preserve the toast value
    1353              :          * OIDs, which we want so as not to invalidate toast pointers in
    1354              :          * system catalog caches, and to avoid making multiple copies of a
    1355              :          * single toast value.
    1356              :          *
    1357              :          * Note that we must hold NewHeap open until we are done writing data,
    1358              :          * since the relcache will not guarantee to remember this setting once
    1359              :          * the relation is closed.  Also, this technique depends on the fact
    1360              :          * that no one will try to read from the NewHeap until after we've
    1361              :          * finished writing it and swapping the rels --- otherwise they could
    1362              :          * follow the toast pointers to the wrong place.  (It would actually
    1363              :          * work for values copied over from the old toast table, but not for
    1364              :          * any values that we toast which were previously not toasted.)
    1365              :          *
    1366              :          * This would not work with CONCURRENTLY because we may need to delete
    1367              :          * TOASTed tuples from the new heap. With this hack, we'd delete them
    1368              :          * from the old heap.
    1369              :          */
    1370          132 :         NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
    1371              :     }
    1372              :     else
    1373          278 :         *pSwapToastByContent = false;
    1374              : 
    1375              :     /*
    1376              :      * Compute xids used to freeze and weed out dead tuples and multixacts.
    1377              :      * Since we're going to rewrite the whole table anyway, there's no reason
    1378              :      * not to be aggressive about this.
    1379              :      */
    1380          410 :     memset(&params, 0, sizeof(VacuumParams));
    1381          410 :     vacuum_get_cutoffs(OldHeap, &params, &cutoffs);
    1382              : 
    1383              :     /*
    1384              :      * FreezeXid will become the table's new relfrozenxid, and that mustn't go
    1385              :      * backwards, so take the max.
    1386              :      */
    1387              :     {
    1388          410 :         TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
    1389              : 
    1390          820 :         if (TransactionIdIsValid(relfrozenxid) &&
    1391          410 :             TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid))
    1392           10 :             cutoffs.FreezeLimit = relfrozenxid;
    1393              :     }
    1394              : 
    1395              :     /*
    1396              :      * MultiXactCutoff, similarly, shouldn't go backwards either.
    1397              :      */
    1398              :     {
    1399          410 :         MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
    1400              : 
    1401          820 :         if (MultiXactIdIsValid(relminmxid) &&
    1402          410 :             MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid))
    1403            0 :             cutoffs.MultiXactCutoff = relminmxid;
    1404              :     }
    1405              : 
    1406              :     /*
    1407              :      * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
    1408              :      * the OldHeap.  We know how to use a sort to duplicate the ordering of a
    1409              :      * btree index, and will use seqscan-and-sort for that case if the planner
    1410              :      * tells us it's cheaper.  Otherwise, always indexscan if an index is
    1411              :      * provided, else plain seqscan.
    1412              :      */
    1413          410 :     if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
    1414          146 :         use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap),
    1415              :                                          RelationGetRelid(OldIndex));
    1416              :     else
    1417          264 :         use_sort = false;
    1418              : 
    1419              :     /* Log what we're doing */
    1420          410 :     if (OldIndex != NULL && !use_sort)
    1421           62 :         ereport(elevel,
    1422              :                 errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
    1423              :                        nspname,
    1424              :                        RelationGetRelationName(OldHeap),
    1425              :                        RelationGetRelationName(OldIndex)));
    1426          348 :     else if (use_sort)
    1427           86 :         ereport(elevel,
    1428              :                 errmsg("repacking \"%s.%s\" using sequential scan and sort",
    1429              :                        nspname,
    1430              :                        RelationGetRelationName(OldHeap)));
    1431              :     else
    1432          262 :         ereport(elevel,
    1433              :                 errmsg("repacking \"%s.%s\" in physical order",
    1434              :                        nspname,
    1435              :                        RelationGetRelationName(OldHeap)));
    1436              : 
    1437              :     /*
    1438              :      * Hand off the actual copying to AM specific function, the generic code
    1439              :      * cannot know how to deal with visibility across AMs. Note that this
    1440              :      * routine is allowed to set FreezeXid / MultiXactCutoff to different
    1441              :      * values (e.g. because the AM doesn't use freezing).
    1442              :      */
    1443          410 :     table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
    1444              :                                     cutoffs.OldestXmin, snapshot,
    1445              :                                     &cutoffs.FreezeLimit,
    1446              :                                     &cutoffs.MultiXactCutoff,
    1447              :                                     &num_tuples, &tups_vacuumed,
    1448              :                                     &tups_recently_dead);
    1449              : 
    1450              :     /* return selected values to caller, get set as relfrozenxid/minmxid */
    1451          410 :     *pFreezeXid = cutoffs.FreezeLimit;
    1452          410 :     *pCutoffMulti = cutoffs.MultiXactCutoff;
    1453              : 
    1454              :     /*
    1455              :      * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
    1456              :      * In the CONCURRENTLY case, we need to set it again before applying the
    1457              :      * concurrent changes.
    1458              :      */
    1459          410 :     NewHeap->rd_toastoid = InvalidOid;
    1460              : 
    1461          410 :     num_pages = RelationGetNumberOfBlocks(NewHeap);
    1462              : 
    1463              :     /* Log what we did */
    1464          410 :     ereport(elevel,
    1465              :             (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
    1466              :                     nspname,
    1467              :                     RelationGetRelationName(OldHeap),
    1468              :                     tups_vacuumed, num_tuples,
    1469              :                     RelationGetNumberOfBlocks(OldHeap)),
    1470              :              errdetail("%.0f dead row versions cannot be removed yet.\n"
    1471              :                        "%s.",
    1472              :                        tups_recently_dead,
    1473              :                        pg_rusage_show(&ru0))));
    1474              : 
    1475              :     /* Update pg_class to reflect the correct values of pages and tuples. */
    1476          410 :     relRelation = table_open(RelationRelationId, RowExclusiveLock);
    1477              : 
    1478          410 :     reltup = SearchSysCacheCopy1(RELOID,
    1479              :                                  ObjectIdGetDatum(RelationGetRelid(NewHeap)));
    1480          410 :     if (!HeapTupleIsValid(reltup))
    1481            0 :         elog(ERROR, "cache lookup failed for relation %u",
    1482              :              RelationGetRelid(NewHeap));
    1483          410 :     relform = (Form_pg_class) GETSTRUCT(reltup);
    1484              : 
    1485          410 :     relform->relpages = num_pages;
    1486          410 :     relform->reltuples = num_tuples;
    1487              : 
    1488              :     /* Don't update the stats for pg_class.  See swap_relation_files. */
    1489          410 :     if (RelationGetRelid(OldHeap) != RelationRelationId)
    1490          387 :         CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
    1491              :     else
    1492           23 :         CacheInvalidateRelcacheByTuple(reltup);
    1493              : 
    1494              :     /* Clean up. */
    1495          410 :     heap_freetuple(reltup);
    1496          410 :     table_close(relRelation, RowExclusiveLock);
    1497              : 
    1498              :     /* Make the update visible */
    1499          410 :     CommandCounterIncrement();
    1500          410 : }
    1501              : 
    1502              : /*
    1503              :  * Swap the physical files of two given relations.
    1504              :  *
    1505              :  * We swap the physical identity (reltablespace, relfilenumber) while keeping
    1506              :  * the same logical identities of the two relations.  relpersistence is also
    1507              :  * swapped, which is critical since it determines where buffers live for each
    1508              :  * relation.
    1509              :  *
    1510              :  * We can swap associated TOAST data in either of two ways: recursively swap
    1511              :  * the physical content of the toast tables (and their indexes), or swap the
    1512              :  * TOAST links in the given relations' pg_class entries.  The former is needed
    1513              :  * to manage rewrites of shared catalogs (where we cannot change the pg_class
    1514              :  * links) while the latter is the only way to handle cases in which a toast
    1515              :  * table is added or removed altogether.
    1516              :  *
    1517              :  * Additionally, the first relation is marked with relfrozenxid set to
    1518              :  * frozenXid.  It seems a bit ugly to have this here, but the caller would
    1519              :  * have to do it anyway, so having it here saves a heap_update.  Note: in
    1520              :  * the swap-toast-links case, we assume we don't need to change the toast
    1521              :  * table's relfrozenxid: the new version of the toast table should already
    1522              :  * have relfrozenxid set to RecentXmin, which is good enough.
    1523              :  *
    1524              :  * Lastly, if r2 and its toast table and toast index (if any) are mapped,
    1525              :  * their OIDs are emitted into mapped_tables[].  This is hacky but beats
    1526              :  * having to look the information up again later in finish_heap_swap.
    1527              :  */
    1528              : static void
    1529         1710 : swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
    1530              :                     bool swap_toast_by_content,
    1531              :                     bool is_internal,
    1532              :                     TransactionId frozenXid,
    1533              :                     MultiXactId cutoffMulti,
    1534              :                     Oid *mapped_tables)
    1535              : {
    1536              :     Relation    relRelation;
    1537              :     HeapTuple   reltup1,
    1538              :                 reltup2;
    1539              :     Form_pg_class relform1,
    1540              :                 relform2;
    1541              :     RelFileNumber relfilenumber1,
    1542              :                 relfilenumber2;
    1543              :     RelFileNumber swaptemp;
    1544              :     char        swptmpchr;
    1545              :     Oid         relam1,
    1546              :                 relam2;
    1547              : 
    1548              :     /* We need writable copies of both pg_class tuples. */
    1549         1710 :     relRelation = table_open(RelationRelationId, RowExclusiveLock);
    1550              : 
    1551         1710 :     reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
    1552         1710 :     if (!HeapTupleIsValid(reltup1))
    1553            0 :         elog(ERROR, "cache lookup failed for relation %u", r1);
    1554         1710 :     relform1 = (Form_pg_class) GETSTRUCT(reltup1);
    1555              : 
    1556         1710 :     reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
    1557         1710 :     if (!HeapTupleIsValid(reltup2))
    1558            0 :         elog(ERROR, "cache lookup failed for relation %u", r2);
    1559         1710 :     relform2 = (Form_pg_class) GETSTRUCT(reltup2);
    1560              : 
    1561         1710 :     relfilenumber1 = relform1->relfilenode;
    1562         1710 :     relfilenumber2 = relform2->relfilenode;
    1563         1710 :     relam1 = relform1->relam;
    1564         1710 :     relam2 = relform2->relam;
    1565              : 
    1566         1710 :     if (RelFileNumberIsValid(relfilenumber1) &&
    1567              :         RelFileNumberIsValid(relfilenumber2))
    1568              :     {
    1569              :         /*
    1570              :          * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
    1571              :          * relpersistence
    1572              :          */
    1573              :         Assert(!target_is_pg_class);
    1574              : 
    1575         1621 :         swaptemp = relform1->relfilenode;
    1576         1621 :         relform1->relfilenode = relform2->relfilenode;
    1577         1621 :         relform2->relfilenode = swaptemp;
    1578              : 
    1579         1621 :         swaptemp = relform1->reltablespace;
    1580         1621 :         relform1->reltablespace = relform2->reltablespace;
    1581         1621 :         relform2->reltablespace = swaptemp;
    1582              : 
    1583         1621 :         swaptemp = relform1->relam;
    1584         1621 :         relform1->relam = relform2->relam;
    1585         1621 :         relform2->relam = swaptemp;
    1586              : 
    1587         1621 :         swptmpchr = relform1->relpersistence;
    1588         1621 :         relform1->relpersistence = relform2->relpersistence;
    1589         1621 :         relform2->relpersistence = swptmpchr;
    1590              : 
    1591              :         /* Also swap toast links, if we're swapping by links */
    1592         1621 :         if (!swap_toast_by_content)
    1593              :         {
    1594         1285 :             swaptemp = relform1->reltoastrelid;
    1595         1285 :             relform1->reltoastrelid = relform2->reltoastrelid;
    1596         1285 :             relform2->reltoastrelid = swaptemp;
    1597              :         }
    1598              :     }
    1599              :     else
    1600              :     {
    1601              :         /*
    1602              :          * Mapped-relation case.  Here we have to swap the relation mappings
    1603              :          * instead of modifying the pg_class columns.  Both must be mapped.
    1604              :          */
    1605           89 :         if (RelFileNumberIsValid(relfilenumber1) ||
    1606              :             RelFileNumberIsValid(relfilenumber2))
    1607            0 :             elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
    1608              :                  NameStr(relform1->relname));
    1609              : 
    1610              :         /*
    1611              :          * We can't change the tablespace nor persistence of a mapped rel, and
    1612              :          * we can't handle toast link swapping for one either, because we must
    1613              :          * not apply any critical changes to its pg_class row.  These cases
    1614              :          * should be prevented by upstream permissions tests, so these checks
    1615              :          * are non-user-facing emergency backstop.
    1616              :          */
    1617           89 :         if (relform1->reltablespace != relform2->reltablespace)
    1618            0 :             elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
    1619              :                  NameStr(relform1->relname));
    1620           89 :         if (relform1->relpersistence != relform2->relpersistence)
    1621            0 :             elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
    1622              :                  NameStr(relform1->relname));
    1623           89 :         if (relform1->relam != relform2->relam)
    1624            0 :             elog(ERROR, "cannot change access method of mapped relation \"%s\"",
    1625              :                  NameStr(relform1->relname));
    1626           89 :         if (!swap_toast_by_content &&
    1627           29 :             (relform1->reltoastrelid || relform2->reltoastrelid))
    1628            0 :             elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
    1629              :                  NameStr(relform1->relname));
    1630              : 
    1631              :         /*
    1632              :          * Fetch the mappings --- shouldn't fail, but be paranoid
    1633              :          */
    1634           89 :         relfilenumber1 = RelationMapOidToFilenumber(r1, relform1->relisshared);
    1635           89 :         if (!RelFileNumberIsValid(relfilenumber1))
    1636            0 :             elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
    1637              :                  NameStr(relform1->relname), r1);
    1638           89 :         relfilenumber2 = RelationMapOidToFilenumber(r2, relform2->relisshared);
    1639           89 :         if (!RelFileNumberIsValid(relfilenumber2))
    1640            0 :             elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
    1641              :                  NameStr(relform2->relname), r2);
    1642              : 
    1643              :         /*
    1644              :          * Send replacement mappings to relmapper.  Note these won't actually
    1645              :          * take effect until CommandCounterIncrement.
    1646              :          */
    1647           89 :         RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
    1648           89 :         RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
    1649              : 
    1650              :         /* Pass OIDs of mapped r2 tables back to caller */
    1651           89 :         *mapped_tables++ = r2;
    1652              :     }
    1653              : 
    1654              :     /*
    1655              :      * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
    1656              :      * subtransaction. The rel2 storage (swapped from rel1) may or may not be
    1657              :      * new.
    1658              :      */
    1659              :     {
    1660              :         Relation    rel1,
    1661              :                     rel2;
    1662              : 
    1663         1710 :         rel1 = relation_open(r1, NoLock);
    1664         1710 :         rel2 = relation_open(r2, NoLock);
    1665         1710 :         rel2->rd_createSubid = rel1->rd_createSubid;
    1666         1710 :         rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
    1667         1710 :         rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
    1668         1710 :         RelationAssumeNewRelfilelocator(rel1);
    1669         1710 :         relation_close(rel1, NoLock);
    1670         1710 :         relation_close(rel2, NoLock);
    1671              :     }
    1672              : 
    1673              :     /*
    1674              :      * In the case of a shared catalog, these next few steps will only affect
    1675              :      * our own database's pg_class row; but that's okay, because they are all
    1676              :      * noncritical updates.  That's also an important fact for the case of a
    1677              :      * mapped catalog, because it's possible that we'll commit the map change
    1678              :      * and then fail to commit the pg_class update.
    1679              :      */
    1680              : 
    1681              :     /* set rel1's frozen Xid and minimum MultiXid */
    1682         1710 :     if (relform1->relkind != RELKIND_INDEX)
    1683              :     {
    1684              :         Assert(!TransactionIdIsValid(frozenXid) ||
    1685              :                TransactionIdIsNormal(frozenXid));
    1686         1570 :         relform1->relfrozenxid = frozenXid;
    1687         1570 :         relform1->relminmxid = cutoffMulti;
    1688              :     }
    1689              : 
    1690              :     /* swap size statistics too, since new rel has freshly-updated stats */
    1691              :     {
    1692              :         int32       swap_pages;
    1693              :         float4      swap_tuples;
    1694              :         int32       swap_allvisible;
    1695              :         int32       swap_allfrozen;
    1696              : 
    1697         1710 :         swap_pages = relform1->relpages;
    1698         1710 :         relform1->relpages = relform2->relpages;
    1699         1710 :         relform2->relpages = swap_pages;
    1700              : 
    1701         1710 :         swap_tuples = relform1->reltuples;
    1702         1710 :         relform1->reltuples = relform2->reltuples;
    1703         1710 :         relform2->reltuples = swap_tuples;
    1704              : 
    1705         1710 :         swap_allvisible = relform1->relallvisible;
    1706         1710 :         relform1->relallvisible = relform2->relallvisible;
    1707         1710 :         relform2->relallvisible = swap_allvisible;
    1708              : 
    1709         1710 :         swap_allfrozen = relform1->relallfrozen;
    1710         1710 :         relform1->relallfrozen = relform2->relallfrozen;
    1711         1710 :         relform2->relallfrozen = swap_allfrozen;
    1712              :     }
    1713              : 
    1714              :     /*
    1715              :      * Update the tuples in pg_class --- unless the target relation of the
    1716              :      * swap is pg_class itself.  In that case, there is zero point in making
    1717              :      * changes because we'd be updating the old data that we're about to throw
    1718              :      * away.  Because the real work being done here for a mapped relation is
    1719              :      * just to change the relation map settings, it's all right to not update
    1720              :      * the pg_class rows in this case. The most important changes will instead
    1721              :      * performed later, in finish_heap_swap() itself.
    1722              :      */
    1723         1710 :     if (!target_is_pg_class)
    1724              :     {
    1725              :         CatalogIndexState indstate;
    1726              : 
    1727         1687 :         indstate = CatalogOpenIndexes(relRelation);
    1728         1687 :         CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1,
    1729              :                                    indstate);
    1730         1687 :         CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2,
    1731              :                                    indstate);
    1732         1687 :         CatalogCloseIndexes(indstate);
    1733              :     }
    1734              :     else
    1735              :     {
    1736              :         /* no update ... but we do still need relcache inval */
    1737           23 :         CacheInvalidateRelcacheByTuple(reltup1);
    1738           23 :         CacheInvalidateRelcacheByTuple(reltup2);
    1739              :     }
    1740              : 
    1741              :     /*
    1742              :      * Now that pg_class has been updated with its relevant information for
    1743              :      * the swap, update the dependency of the relations to point to their new
    1744              :      * table AM, if it has changed.
    1745              :      */
    1746         1710 :     if (relam1 != relam2)
    1747              :     {
    1748           24 :         if (changeDependencyFor(RelationRelationId,
    1749              :                                 r1,
    1750              :                                 AccessMethodRelationId,
    1751              :                                 relam1,
    1752              :                                 relam2) != 1)
    1753            0 :             elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
    1754              :                  get_namespace_name(get_rel_namespace(r1)),
    1755              :                  get_rel_name(r1));
    1756           24 :         if (changeDependencyFor(RelationRelationId,
    1757              :                                 r2,
    1758              :                                 AccessMethodRelationId,
    1759              :                                 relam2,
    1760              :                                 relam1) != 1)
    1761            0 :             elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
    1762              :                  get_namespace_name(get_rel_namespace(r2)),
    1763              :                  get_rel_name(r2));
    1764              :     }
    1765              : 
    1766              :     /*
    1767              :      * Post alter hook for modified relations. The change to r2 is always
    1768              :      * internal, but r1 depends on the invocation context.
    1769              :      */
    1770         1710 :     InvokeObjectPostAlterHookArg(RelationRelationId, r1, 0,
    1771              :                                  InvalidOid, is_internal);
    1772         1710 :     InvokeObjectPostAlterHookArg(RelationRelationId, r2, 0,
    1773              :                                  InvalidOid, true);
    1774              : 
    1775              :     /*
    1776              :      * If we have toast tables associated with the relations being swapped,
    1777              :      * deal with them too.
    1778              :      */
    1779         1710 :     if (relform1->reltoastrelid || relform2->reltoastrelid)
    1780              :     {
    1781          542 :         if (swap_toast_by_content)
    1782              :         {
    1783          132 :             if (relform1->reltoastrelid && relform2->reltoastrelid)
    1784              :             {
    1785              :                 /* Recursively swap the contents of the toast tables */
    1786          132 :                 swap_relation_files(relform1->reltoastrelid,
    1787              :                                     relform2->reltoastrelid,
    1788              :                                     target_is_pg_class,
    1789              :                                     swap_toast_by_content,
    1790              :                                     is_internal,
    1791              :                                     frozenXid,
    1792              :                                     cutoffMulti,
    1793              :                                     mapped_tables);
    1794              :             }
    1795              :             else
    1796              :             {
    1797              :                 /* caller messed up */
    1798            0 :                 elog(ERROR, "cannot swap toast files by content when there's only one");
    1799              :             }
    1800              :         }
    1801              :         else
    1802              :         {
    1803              :             /*
    1804              :              * We swapped the ownership links, so we need to change dependency
    1805              :              * data to match.
    1806              :              *
    1807              :              * NOTE: it is possible that only one table has a toast table.
    1808              :              *
    1809              :              * NOTE: at present, a TOAST table's only dependency is the one on
    1810              :              * its owning table.  If more are ever created, we'd need to use
    1811              :              * something more selective than deleteDependencyRecordsFor() to
    1812              :              * get rid of just the link we want.
    1813              :              */
    1814              :             ObjectAddress baseobject,
    1815              :                         toastobject;
    1816              :             long        count;
    1817              : 
    1818              :             /*
    1819              :              * We disallow this case for system catalogs, to avoid the
    1820              :              * possibility that the catalog we're rebuilding is one of the
    1821              :              * ones the dependency changes would change.  It's too late to be
    1822              :              * making any data changes to the target catalog.
    1823              :              */
    1824          410 :             if (IsSystemClass(r1, relform1))
    1825            0 :                 elog(ERROR, "cannot swap toast files by links for system catalogs");
    1826              : 
    1827              :             /* Delete old dependencies */
    1828          410 :             if (relform1->reltoastrelid)
    1829              :             {
    1830          389 :                 count = deleteDependencyRecordsFor(RelationRelationId,
    1831              :                                                    relform1->reltoastrelid,
    1832              :                                                    false);
    1833          389 :                 if (count != 1)
    1834            0 :                     elog(ERROR, "expected one dependency record for TOAST table, found %ld",
    1835              :                          count);
    1836              :             }
    1837          410 :             if (relform2->reltoastrelid)
    1838              :             {
    1839          410 :                 count = deleteDependencyRecordsFor(RelationRelationId,
    1840              :                                                    relform2->reltoastrelid,
    1841              :                                                    false);
    1842          410 :                 if (count != 1)
    1843            0 :                     elog(ERROR, "expected one dependency record for TOAST table, found %ld",
    1844              :                          count);
    1845              :             }
    1846              : 
    1847              :             /* Register new dependencies */
    1848          410 :             baseobject.classId = RelationRelationId;
    1849          410 :             baseobject.objectSubId = 0;
    1850          410 :             toastobject.classId = RelationRelationId;
    1851          410 :             toastobject.objectSubId = 0;
    1852              : 
    1853          410 :             if (relform1->reltoastrelid)
    1854              :             {
    1855          389 :                 baseobject.objectId = r1;
    1856          389 :                 toastobject.objectId = relform1->reltoastrelid;
    1857          389 :                 recordDependencyOn(&toastobject, &baseobject,
    1858              :                                    DEPENDENCY_INTERNAL);
    1859              :             }
    1860              : 
    1861          410 :             if (relform2->reltoastrelid)
    1862              :             {
    1863          410 :                 baseobject.objectId = r2;
    1864          410 :                 toastobject.objectId = relform2->reltoastrelid;
    1865          410 :                 recordDependencyOn(&toastobject, &baseobject,
    1866              :                                    DEPENDENCY_INTERNAL);
    1867              :             }
    1868              :         }
    1869              :     }
    1870              : 
    1871              :     /*
    1872              :      * If we're swapping two toast tables by content, do the same for their
    1873              :      * valid index. The swap can actually be safely done only if the relations
    1874              :      * have indexes.
    1875              :      */
    1876         1710 :     if (swap_toast_by_content &&
    1877          396 :         relform1->relkind == RELKIND_TOASTVALUE &&
    1878          132 :         relform2->relkind == RELKIND_TOASTVALUE)
    1879              :     {
    1880              :         Oid         toastIndex1,
    1881              :                     toastIndex2;
    1882              : 
    1883              :         /* Get valid index for each relation */
    1884          132 :         toastIndex1 = toast_get_valid_index(r1,
    1885              :                                             AccessExclusiveLock);
    1886          132 :         toastIndex2 = toast_get_valid_index(r2,
    1887              :                                             AccessExclusiveLock);
    1888              : 
    1889          132 :         swap_relation_files(toastIndex1,
    1890              :                             toastIndex2,
    1891              :                             target_is_pg_class,
    1892              :                             swap_toast_by_content,
    1893              :                             is_internal,
    1894              :                             InvalidTransactionId,
    1895              :                             InvalidMultiXactId,
    1896              :                             mapped_tables);
    1897              :     }
    1898              : 
    1899              :     /* Clean up. */
    1900         1710 :     heap_freetuple(reltup1);
    1901         1710 :     heap_freetuple(reltup2);
    1902              : 
    1903         1710 :     table_close(relRelation, RowExclusiveLock);
    1904         1710 : }
    1905              : 
    1906              : /*
    1907              :  * Remove the transient table that was built by make_new_heap, and finish
    1908              :  * cleaning up (including rebuilding all indexes on the old heap).
    1909              :  */
    1910              : void
    1911         1438 : finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
    1912              :                  bool is_system_catalog,
    1913              :                  bool swap_toast_by_content,
    1914              :                  bool check_constraints,
    1915              :                  bool is_internal,
    1916              :                  bool reindex,
    1917              :                  TransactionId frozenXid,
    1918              :                  MultiXactId cutoffMulti,
    1919              :                  char newrelpersistence)
    1920              : {
    1921              :     ObjectAddress object;
    1922              :     Oid         mapped_tables[4];
    1923              :     int         i;
    1924              : 
    1925              :     /* Report that we are now swapping relation files */
    1926         1438 :     pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    1927              :                                  PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
    1928              : 
    1929              :     /* Zero out possible results from swapped_relation_files */
    1930         1438 :     memset(mapped_tables, 0, sizeof(mapped_tables));
    1931              : 
    1932              :     /*
    1933              :      * Swap the contents of the heap relations (including any toast tables).
    1934              :      * Also set old heap's relfrozenxid to frozenXid.
    1935              :      */
    1936         1438 :     swap_relation_files(OIDOldHeap, OIDNewHeap,
    1937              :                         (OIDOldHeap == RelationRelationId),
    1938              :                         swap_toast_by_content, is_internal,
    1939              :                         frozenXid, cutoffMulti, mapped_tables);
    1940              : 
    1941              :     /*
    1942              :      * If it's a system catalog, queue a sinval message to flush all catcaches
    1943              :      * on the catalog when we reach CommandCounterIncrement.
    1944              :      */
    1945         1438 :     if (is_system_catalog)
    1946          121 :         CacheInvalidateCatalog(OIDOldHeap);
    1947              : 
    1948         1438 :     if (reindex)
    1949              :     {
    1950              :         int         reindex_flags;
    1951         1431 :         ReindexParams reindex_params = {0};
    1952              : 
    1953              :         /*
    1954              :          * Rebuild each index on the relation (but not the toast table, which
    1955              :          * is all-new at this point).  It is important to do this before the
    1956              :          * DROP step because if we are processing a system catalog that will
    1957              :          * be used during DROP, we want to have its indexes available.  There
    1958              :          * is no advantage to the other order anyway because this is all
    1959              :          * transactional, so no chance to reclaim disk space before commit. We
    1960              :          * do not need a final CommandCounterIncrement() because
    1961              :          * reindex_relation does it.
    1962              :          *
    1963              :          * Note: because index_build is called via reindex_relation, it will
    1964              :          * never set indcheckxmin true for the indexes.  This is OK even
    1965              :          * though in some sense we are building new indexes rather than
    1966              :          * rebuilding existing ones, because the new heap won't contain any
    1967              :          * HOT chains at all, let alone broken ones, so it can't be necessary
    1968              :          * to set indcheckxmin.
    1969              :          */
    1970         1431 :         reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
    1971         1431 :         if (check_constraints)
    1972         1028 :             reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
    1973              : 
    1974              :         /*
    1975              :          * Ensure that the indexes have the same persistence as the parent
    1976              :          * relation.
    1977              :          */
    1978         1431 :         if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
    1979           25 :             reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
    1980         1406 :         else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
    1981         1353 :             reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
    1982              : 
    1983              :         /* Report that we are now reindexing relations */
    1984         1431 :         pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    1985              :                                      PROGRESS_REPACK_PHASE_REBUILD_INDEX);
    1986              : 
    1987         1431 :         reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
    1988              :     }
    1989              : 
    1990              :     /* Report that we are now doing clean up */
    1991         1426 :     pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    1992              :                                  PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
    1993              : 
    1994              :     /*
    1995              :      * If the relation being rebuilt is pg_class, swap_relation_files()
    1996              :      * couldn't update pg_class's own pg_class entry (check comments in
    1997              :      * swap_relation_files()), thus relfrozenxid was not updated. That's
    1998              :      * annoying because a potential reason for doing a VACUUM FULL is a
    1999              :      * imminent or actual anti-wraparound shutdown.  So, now that we can
    2000              :      * access the new relation using its indices, update relfrozenxid.
    2001              :      * pg_class doesn't have a toast relation, so we don't need to update the
    2002              :      * corresponding toast relation. Not that there's little point moving all
    2003              :      * relfrozenxid updates here since swap_relation_files() needs to write to
    2004              :      * pg_class for non-mapped relations anyway.
    2005              :      */
    2006         1426 :     if (OIDOldHeap == RelationRelationId)
    2007              :     {
    2008              :         Relation    relRelation;
    2009              :         HeapTuple   reltup;
    2010              :         Form_pg_class relform;
    2011              : 
    2012           23 :         relRelation = table_open(RelationRelationId, RowExclusiveLock);
    2013              : 
    2014           23 :         reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(OIDOldHeap));
    2015           23 :         if (!HeapTupleIsValid(reltup))
    2016            0 :             elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
    2017           23 :         relform = (Form_pg_class) GETSTRUCT(reltup);
    2018              : 
    2019           23 :         relform->relfrozenxid = frozenXid;
    2020           23 :         relform->relminmxid = cutoffMulti;
    2021              : 
    2022           23 :         CatalogTupleUpdate(relRelation, &reltup->t_self, reltup);
    2023              : 
    2024           23 :         table_close(relRelation, RowExclusiveLock);
    2025              :     }
    2026              : 
    2027              :     /* Destroy new heap with old filenumber */
    2028         1426 :     object.classId = RelationRelationId;
    2029         1426 :     object.objectId = OIDNewHeap;
    2030         1426 :     object.objectSubId = 0;
    2031              : 
    2032         1426 :     if (!reindex)
    2033              :     {
    2034              :         /*
    2035              :          * Make sure the changes in pg_class are visible. This is especially
    2036              :          * important if !swap_toast_by_content, so that the correct TOAST
    2037              :          * relation is dropped. (reindex_relation() above did not help in this
    2038              :          * case))
    2039              :          */
    2040            7 :         CommandCounterIncrement();
    2041              :     }
    2042              : 
    2043              :     /*
    2044              :      * The new relation is local to our transaction and we know nothing
    2045              :      * depends on it, so DROP_RESTRICT should be OK.
    2046              :      */
    2047         1426 :     performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
    2048              : 
    2049              :     /* performDeletion does CommandCounterIncrement at end */
    2050              : 
    2051              :     /*
    2052              :      * Now we must remove any relation mapping entries that we set up for the
    2053              :      * transient table, as well as its toast table and toast index if any. If
    2054              :      * we fail to do this before commit, the relmapper will complain about new
    2055              :      * permanent map entries being added post-bootstrap.
    2056              :      */
    2057         1515 :     for (i = 0; OidIsValid(mapped_tables[i]); i++)
    2058           89 :         RelationMapRemoveMapping(mapped_tables[i]);
    2059              : 
    2060              :     /*
    2061              :      * At this point, everything is kosher except that, if we did toast swap
    2062              :      * by links, the toast table's name corresponds to the transient table.
    2063              :      * The name is irrelevant to the backend because it's referenced by OID,
    2064              :      * but users looking at the catalogs could be confused.  Rename it to
    2065              :      * prevent this problem.
    2066              :      *
    2067              :      * Note no lock required on the relation, because we already hold an
    2068              :      * exclusive lock on it.
    2069              :      */
    2070         1426 :     if (!swap_toast_by_content)
    2071              :     {
    2072              :         Relation    newrel;
    2073              : 
    2074         1294 :         newrel = table_open(OIDOldHeap, NoLock);
    2075         1294 :         if (OidIsValid(newrel->rd_rel->reltoastrelid))
    2076              :         {
    2077              :             Oid         toastidx;
    2078              :             char        NewToastName[NAMEDATALEN];
    2079              : 
    2080              :             /* Get the associated valid index to be renamed */
    2081          389 :             toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
    2082              :                                              AccessExclusiveLock);
    2083              : 
    2084              :             /* rename the toast table ... */
    2085          389 :             snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
    2086              :                      OIDOldHeap);
    2087          389 :             RenameRelationInternal(newrel->rd_rel->reltoastrelid,
    2088              :                                    NewToastName, true, false);
    2089              : 
    2090              :             /* ... and its valid index too. */
    2091          389 :             snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
    2092              :                      OIDOldHeap);
    2093              : 
    2094          389 :             RenameRelationInternal(toastidx,
    2095              :                                    NewToastName, true, true);
    2096              : 
    2097              :             /*
    2098              :              * Reset the relrewrite for the toast. The command-counter
    2099              :              * increment is required here as we are about to update the tuple
    2100              :              * that is updated as part of RenameRelationInternal.
    2101              :              */
    2102          389 :             CommandCounterIncrement();
    2103          389 :             ResetRelRewrite(newrel->rd_rel->reltoastrelid);
    2104              :         }
    2105         1294 :         relation_close(newrel, NoLock);
    2106              :     }
    2107              : 
    2108              :     /* if it's not a catalog table, clear any missing attribute settings */
    2109         1426 :     if (!is_system_catalog)
    2110              :     {
    2111              :         Relation    newrel;
    2112              : 
    2113         1305 :         newrel = table_open(OIDOldHeap, NoLock);
    2114         1305 :         RelationClearMissing(newrel);
    2115         1305 :         relation_close(newrel, NoLock);
    2116              :     }
    2117         1426 : }
    2118              : 
    2119              : /*
    2120              :  * Determine which relations to process, when REPACK/CLUSTER is called
    2121              :  * without specifying a table name.  The exact process depends on whether
    2122              :  * USING INDEX was given or not, and in any case we only return tables and
    2123              :  * materialized views that the current user has privileges to repack/cluster.
    2124              :  *
    2125              :  * If USING INDEX was given, we scan pg_index to find those that have
    2126              :  * indisclustered set; if it was not given, scan pg_class and return all
    2127              :  * tables.
    2128              :  *
    2129              :  * Return it as a list of RelToCluster in the given memory context.
    2130              :  */
    2131              : static List *
    2132           16 : get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
    2133              : {
    2134              :     Relation    catalog;
    2135              :     TableScanDesc scan;
    2136              :     HeapTuple   tuple;
    2137           16 :     List       *rtcs = NIL;
    2138              : 
    2139           16 :     if (usingindex)
    2140              :     {
    2141              :         ScanKeyData entry;
    2142              : 
    2143              :         /*
    2144              :          * For USING INDEX, scan pg_index to find those with indisclustered.
    2145              :          */
    2146           12 :         catalog = table_open(IndexRelationId, AccessShareLock);
    2147           12 :         ScanKeyInit(&entry,
    2148              :                     Anum_pg_index_indisclustered,
    2149              :                     BTEqualStrategyNumber, F_BOOLEQ,
    2150              :                     BoolGetDatum(true));
    2151           12 :         scan = table_beginscan_catalog(catalog, 1, &entry);
    2152           24 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2153              :         {
    2154              :             RelToCluster *rtc;
    2155              :             Form_pg_index index;
    2156              :             HeapTuple   classtup;
    2157              :             Form_pg_class classForm;
    2158              :             MemoryContext oldcxt;
    2159              : 
    2160           12 :             index = (Form_pg_index) GETSTRUCT(tuple);
    2161              : 
    2162              :             /*
    2163              :              * Try to obtain a light lock on the index's table, to ensure it
    2164              :              * doesn't go away while we collect the list.  If we cannot, just
    2165              :              * disregard it.  Be sure to release this if we ultimately decide
    2166              :              * not to process the table!
    2167              :              */
    2168           12 :             if (!ConditionalLockRelationOid(index->indrelid, AccessShareLock))
    2169            0 :                 continue;
    2170              : 
    2171              :             /* Verify that the table still exists; skip if not */
    2172           12 :             classtup = SearchSysCache1(RELOID, ObjectIdGetDatum(index->indrelid));
    2173           12 :             if (!HeapTupleIsValid(classtup))
    2174              :             {
    2175            0 :                 UnlockRelationOid(index->indrelid, AccessShareLock);
    2176            0 :                 continue;
    2177              :             }
    2178           12 :             classForm = (Form_pg_class) GETSTRUCT(classtup);
    2179              : 
    2180              :             /* Skip temp relations belonging to other sessions */
    2181           12 :             if (classForm->relpersistence == RELPERSISTENCE_TEMP &&
    2182            0 :                 !isTempOrTempToastNamespace(classForm->relnamespace))
    2183              :             {
    2184            0 :                 ReleaseSysCache(classtup);
    2185            0 :                 UnlockRelationOid(index->indrelid, AccessShareLock);
    2186            0 :                 continue;
    2187              :             }
    2188              : 
    2189           12 :             ReleaseSysCache(classtup);
    2190              : 
    2191              :             /* noisily skip rels which the user can't process */
    2192           12 :             if (!repack_is_permitted_for_relation(cmd, index->indrelid,
    2193              :                                                   GetUserId()))
    2194              :             {
    2195            8 :                 UnlockRelationOid(index->indrelid, AccessShareLock);
    2196            8 :                 continue;
    2197              :             }
    2198              : 
    2199              :             /* Use a permanent memory context for the result list */
    2200            4 :             oldcxt = MemoryContextSwitchTo(permcxt);
    2201            4 :             rtc = palloc_object(RelToCluster);
    2202            4 :             rtc->tableOid = index->indrelid;
    2203            4 :             rtc->indexOid = index->indexrelid;
    2204            4 :             rtcs = lappend(rtcs, rtc);
    2205            4 :             MemoryContextSwitchTo(oldcxt);
    2206              :         }
    2207              :     }
    2208              :     else
    2209              :     {
    2210            4 :         catalog = table_open(RelationRelationId, AccessShareLock);
    2211            4 :         scan = table_beginscan_catalog(catalog, 0, NULL);
    2212              : 
    2213         9164 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2214              :         {
    2215              :             RelToCluster *rtc;
    2216              :             Form_pg_class class;
    2217              :             MemoryContext oldcxt;
    2218              : 
    2219         9160 :             class = (Form_pg_class) GETSTRUCT(tuple);
    2220              : 
    2221              :             /*
    2222              :              * Try to obtain a light lock on the table, to ensure it doesn't
    2223              :              * go away while we collect the list.  If we cannot, just
    2224              :              * disregard the table.  Be sure to release this if we ultimately
    2225              :              * decide not to process the table!
    2226              :              */
    2227         9160 :             if (!ConditionalLockRelationOid(class->oid, AccessShareLock))
    2228            0 :                 continue;
    2229              : 
    2230              :             /* Verify that the table still exists */
    2231         9160 :             if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(class->oid)))
    2232              :             {
    2233            0 :                 UnlockRelationOid(class->oid, AccessShareLock);
    2234            0 :                 continue;
    2235              :             }
    2236              : 
    2237              :             /* Can only process plain tables and matviews */
    2238         9160 :             if (class->relkind != RELKIND_RELATION &&
    2239         6068 :                 class->relkind != RELKIND_MATVIEW)
    2240              :             {
    2241         6036 :                 UnlockRelationOid(class->oid, AccessShareLock);
    2242         6036 :                 continue;
    2243              :             }
    2244              : 
    2245              :             /* Skip temp relations belonging to other sessions */
    2246         3124 :             if (class->relpersistence == RELPERSISTENCE_TEMP &&
    2247           16 :                 !isTempOrTempToastNamespace(class->relnamespace))
    2248              :             {
    2249            0 :                 UnlockRelationOid(class->oid, AccessShareLock);
    2250            0 :                 continue;
    2251              :             }
    2252              : 
    2253              :             /* noisily skip rels which the user can't process */
    2254         3124 :             if (!repack_is_permitted_for_relation(cmd, class->oid,
    2255              :                                                   GetUserId()))
    2256              :             {
    2257         3116 :                 UnlockRelationOid(class->oid, AccessShareLock);
    2258         3116 :                 continue;
    2259              :             }
    2260              : 
    2261              :             /* Use a permanent memory context for the result list */
    2262            8 :             oldcxt = MemoryContextSwitchTo(permcxt);
    2263            8 :             rtc = palloc_object(RelToCluster);
    2264            8 :             rtc->tableOid = class->oid;
    2265            8 :             rtc->indexOid = InvalidOid;
    2266            8 :             rtcs = lappend(rtcs, rtc);
    2267            8 :             MemoryContextSwitchTo(oldcxt);
    2268              :         }
    2269              :     }
    2270              : 
    2271           16 :     table_endscan(scan);
    2272           16 :     relation_close(catalog, AccessShareLock);
    2273              : 
    2274           16 :     return rtcs;
    2275              : }
    2276              : 
    2277              : /*
    2278              :  * Given a partitioned table or its index, return a list of RelToCluster for
    2279              :  * all the leaf child tables/indexes.
    2280              :  *
    2281              :  * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
    2282              :  * owning relation.
    2283              :  */
    2284              : static List *
    2285           20 : get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid,
    2286              :                                  bool rel_is_index, MemoryContext permcxt)
    2287              : {
    2288              :     List       *inhoids;
    2289           20 :     List       *rtcs = NIL;
    2290              : 
    2291              :     /*
    2292              :      * Do not lock the children until they're processed.  Note that we do hold
    2293              :      * a lock on the parent partitioned table.
    2294              :      */
    2295           20 :     inhoids = find_all_inheritors(relid, NoLock, NULL);
    2296          148 :     foreach_oid(child_oid, inhoids)
    2297              :     {
    2298              :         Oid         table_oid,
    2299              :                     index_oid;
    2300              :         RelToCluster *rtc;
    2301              :         MemoryContext oldcxt;
    2302              : 
    2303          108 :         if (rel_is_index)
    2304              :         {
    2305              :             /* consider only leaf indexes */
    2306           80 :             if (get_rel_relkind(child_oid) != RELKIND_INDEX)
    2307           40 :                 continue;
    2308              : 
    2309           40 :             table_oid = IndexGetRelation(child_oid, false);
    2310           40 :             index_oid = child_oid;
    2311              :         }
    2312              :         else
    2313              :         {
    2314              :             /* consider only leaf relations */
    2315           28 :             if (get_rel_relkind(child_oid) != RELKIND_RELATION)
    2316           16 :                 continue;
    2317              : 
    2318           12 :             table_oid = child_oid;
    2319           12 :             index_oid = InvalidOid;
    2320              :         }
    2321              : 
    2322              :         /*
    2323              :          * It's possible that the user does not have privileges to CLUSTER the
    2324              :          * leaf partition despite having them on the partitioned table.  Skip
    2325              :          * if so.
    2326              :          */
    2327           52 :         if (!repack_is_permitted_for_relation(cmd, table_oid, GetUserId()))
    2328           12 :             continue;
    2329              : 
    2330              :         /* Use a permanent memory context for the result list */
    2331           40 :         oldcxt = MemoryContextSwitchTo(permcxt);
    2332           40 :         rtc = palloc_object(RelToCluster);
    2333           40 :         rtc->tableOid = table_oid;
    2334           40 :         rtc->indexOid = index_oid;
    2335           40 :         rtcs = lappend(rtcs, rtc);
    2336           40 :         MemoryContextSwitchTo(oldcxt);
    2337              :     }
    2338              : 
    2339           20 :     return rtcs;
    2340              : }
    2341              : 
    2342              : 
    2343              : /*
    2344              :  * Return whether userid has privileges to REPACK relid.  If not, this
    2345              :  * function emits a WARNING.
    2346              :  */
    2347              : static bool
    2348         3240 : repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
    2349              : {
    2350              :     Assert(cmd == REPACK_COMMAND_CLUSTER || cmd == REPACK_COMMAND_REPACK);
    2351              : 
    2352         3240 :     if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
    2353          104 :         return true;
    2354              : 
    2355         3136 :     ereport(WARNING,
    2356              :             errmsg("permission denied to execute %s on \"%s\", skipping it",
    2357              :                    RepackCommandAsString(cmd),
    2358              :                    get_rel_name(relid)));
    2359              : 
    2360         3136 :     return false;
    2361              : }
    2362              : 
    2363              : 
    2364              : /*
    2365              :  * Given a RepackStmt with an indicated relation name, resolve the relation
    2366              :  * name, obtain lock on it, then determine what to do based on the relation
    2367              :  * type: if it's table and not partitioned, repack it as indicated (using an
    2368              :  * existing clustered index, or following the given one), and return NULL.
    2369              :  *
    2370              :  * On the other hand, if the table is partitioned, do nothing further and
    2371              :  * instead return the opened and locked relcache entry, so that caller can
    2372              :  * process the partitions using the multiple-table handling code.  In this
    2373              :  * case, if an index name is given, it's up to the caller to resolve it.
    2374              :  */
    2375              : static Relation
    2376          217 : process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
    2377              :                         ClusterParams *params)
    2378              : {
    2379              :     Relation    rel;
    2380              :     Oid         tableOid;
    2381              : 
    2382              :     Assert(stmt->relation != NULL);
    2383              :     Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
    2384              :            stmt->command == REPACK_COMMAND_REPACK);
    2385              : 
    2386              :     /*
    2387              :      * Make sure ANALYZE is specified if a column list is present.
    2388              :      */
    2389          217 :     if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
    2390            4 :         ereport(ERROR,
    2391              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2392              :                 errmsg("ANALYZE option must be specified when a column list is provided"));
    2393              : 
    2394              :     /* Find, lock, and check permissions on the table. */
    2395          213 :     tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
    2396              :                                         lockmode,
    2397              :                                         0,
    2398              :                                         RangeVarCallbackMaintainsTable,
    2399              :                                         NULL);
    2400          205 :     rel = table_open(tableOid, NoLock);
    2401              : 
    2402              :     /*
    2403              :      * Reject clustering a remote temp table ... their local buffer manager is
    2404              :      * not going to cope.
    2405              :      */
    2406          205 :     if (RELATION_IS_OTHER_TEMP(rel))
    2407            1 :         ereport(ERROR,
    2408              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2409              :         /*- translator: first %s is name of a SQL command, eg. REPACK */
    2410              :                 errmsg("cannot execute %s on temporary tables of other sessions",
    2411              :                        RepackCommandAsString(stmt->command)));
    2412              : 
    2413              :     /*
    2414              :      * For partitioned tables, let caller handle this.  Otherwise, process it
    2415              :      * here and we're done.
    2416              :      */
    2417          204 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2418           33 :         return rel;
    2419              :     else
    2420              :     {
    2421          171 :         Oid         indexOid = InvalidOid;
    2422              : 
    2423          171 :         indexOid = determine_clustered_index(rel, stmt->usingindex,
    2424          171 :                                              stmt->indexname);
    2425          163 :         if (OidIsValid(indexOid))
    2426          140 :             check_index_is_clusterable(rel, indexOid, lockmode);
    2427              : 
    2428          151 :         cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
    2429              : 
    2430              :         /*
    2431              :          * Do an analyze, if requested.  We close the transaction and start a
    2432              :          * new one, so that we don't hold the stronger lock for longer than
    2433              :          * needed.
    2434              :          */
    2435          140 :         if (params->options & CLUOPT_ANALYZE)
    2436              :         {
    2437            8 :             VacuumParams vac_params = {0};
    2438              : 
    2439            8 :             PopActiveSnapshot();
    2440            8 :             CommitTransactionCommand();
    2441              : 
    2442            8 :             StartTransactionCommand();
    2443            8 :             PushActiveSnapshot(GetTransactionSnapshot());
    2444              : 
    2445            8 :             vac_params.options |= VACOPT_ANALYZE;
    2446            8 :             if (params->options & CLUOPT_VERBOSE)
    2447            0 :                 vac_params.options |= VACOPT_VERBOSE;
    2448            8 :             analyze_rel(tableOid, NULL, &vac_params,
    2449            8 :                         stmt->relation->va_cols, true, NULL);
    2450            8 :             PopActiveSnapshot();
    2451            8 :             CommandCounterIncrement();
    2452              :         }
    2453              : 
    2454          140 :         return NULL;
    2455              :     }
    2456              : }
    2457              : 
    2458              : /*
    2459              :  * Given a relation and the usingindex/indexname options in a
    2460              :  * REPACK USING INDEX or CLUSTER command, return the OID of the
    2461              :  * index to use for clustering the table.
    2462              :  *
    2463              :  * Caller must hold lock on the relation so that the set of indexes
    2464              :  * doesn't change, and must call check_index_is_clusterable.
    2465              :  */
    2466              : static Oid
    2467          191 : determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
    2468              : {
    2469              :     Oid         indexOid;
    2470              : 
    2471          191 :     if (indexname == NULL && usingindex)
    2472              :     {
    2473              :         /*
    2474              :          * If USING INDEX with no name is given, find a clustered index, or
    2475              :          * error out if none.
    2476              :          */
    2477           21 :         indexOid = InvalidOid;
    2478           46 :         foreach_oid(idxoid, RelationGetIndexList(rel))
    2479              :         {
    2480           21 :             if (get_index_isclustered(idxoid))
    2481              :             {
    2482           17 :                 indexOid = idxoid;
    2483           17 :                 break;
    2484              :             }
    2485              :         }
    2486              : 
    2487           21 :         if (!OidIsValid(indexOid))
    2488            4 :             ereport(ERROR,
    2489              :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    2490              :                     errmsg("there is no previously clustered index for table \"%s\"",
    2491              :                            RelationGetRelationName(rel)));
    2492              :     }
    2493          170 :     else if (indexname != NULL)
    2494              :     {
    2495              :         /* An index was specified; obtain its OID. */
    2496          147 :         indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
    2497          147 :         if (!OidIsValid(indexOid))
    2498            4 :             ereport(ERROR,
    2499              :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    2500              :                     errmsg("index \"%s\" for table \"%s\" does not exist",
    2501              :                            indexname, RelationGetRelationName(rel)));
    2502              :     }
    2503              :     else
    2504           23 :         indexOid = InvalidOid;
    2505              : 
    2506          183 :     return indexOid;
    2507              : }
    2508              : 
    2509              : static const char *
    2510         3611 : RepackCommandAsString(RepackCommand cmd)
    2511              : {
    2512         3611 :     switch (cmd)
    2513              :     {
    2514         3199 :         case REPACK_COMMAND_REPACK:
    2515         3199 :             return "REPACK";
    2516          226 :         case REPACK_COMMAND_VACUUMFULL:
    2517          226 :             return "VACUUM";
    2518          186 :         case REPACK_COMMAND_CLUSTER:
    2519          186 :             return "CLUSTER";
    2520              :     }
    2521            0 :     return "???";             /* keep compiler quiet */
    2522              : }
    2523              : 
    2524              : /*
    2525              :  * Apply all the changes stored in 'file'.
    2526              :  */
    2527              : static void
    2528           14 : apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
    2529              : {
    2530           14 :     ConcurrentChangeKind kind = '\0';
    2531           14 :     Relation    rel = chgcxt->cc_rel;
    2532              :     TupleTableSlot *spilled_tuple;
    2533              :     TupleTableSlot *old_update_tuple;
    2534              :     TupleTableSlot *ondisk_tuple;
    2535           14 :     bool        have_old_tuple = false;
    2536              :     MemoryContext oldcxt;
    2537              : 
    2538           14 :     spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
    2539              :                                              &TTSOpsVirtual);
    2540           14 :     ondisk_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
    2541              :                                             table_slot_callbacks(rel));
    2542           14 :     old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
    2543              :                                                 &TTSOpsVirtual);
    2544              : 
    2545           14 :     oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
    2546              : 
    2547              :     while (true)
    2548           40 :     {
    2549              :         size_t      nread;
    2550           54 :         ConcurrentChangeKind prevkind = kind;
    2551              : 
    2552           54 :         CHECK_FOR_INTERRUPTS();
    2553              : 
    2554           54 :         nread = BufFileReadMaybeEOF(file, &kind, 1, true);
    2555           54 :         if (nread == 0)         /* done with the file? */
    2556           14 :             break;
    2557              : 
    2558              :         /*
    2559              :          * If this is the old tuple for an update, read it into the tuple slot
    2560              :          * and go to the next one.  The update itself will be executed on the
    2561              :          * next iteration, when we receive the NEW tuple.
    2562              :          */
    2563           40 :         if (kind == CHANGE_UPDATE_OLD)
    2564              :         {
    2565            8 :             restore_tuple(file, rel, old_update_tuple);
    2566            8 :             have_old_tuple = true;
    2567            8 :             continue;
    2568              :         }
    2569              : 
    2570              :         /*
    2571              :          * Just before an UPDATE or DELETE, we must update the command
    2572              :          * counter, because the change could refer to a tuple that we have
    2573              :          * just inserted; and before an INSERT, we have to do this also if the
    2574              :          * previous command was either update or delete.
    2575              :          *
    2576              :          * With this approach we don't spend so many CCIs for long strings of
    2577              :          * only INSERTs, which can't affect one another.
    2578              :          */
    2579           32 :         if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
    2580            7 :             (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
    2581              :                                        prevkind == CHANGE_DELETE)))
    2582              :         {
    2583           29 :             CommandCounterIncrement();
    2584           29 :             UpdateActiveSnapshotCommandId();
    2585              :         }
    2586              : 
    2587              :         /*
    2588              :          * Now restore the tuple into the slot and execute the change.
    2589              :          */
    2590           32 :         restore_tuple(file, rel, spilled_tuple);
    2591              : 
    2592           32 :         if (kind == CHANGE_INSERT)
    2593              :         {
    2594            7 :             apply_concurrent_insert(rel, spilled_tuple, chgcxt);
    2595              :         }
    2596           25 :         else if (kind == CHANGE_DELETE)
    2597              :         {
    2598              :             bool        found;
    2599              : 
    2600              :             /* Find the tuple to be deleted */
    2601            3 :             found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
    2602            3 :             if (!found)
    2603            0 :                 elog(ERROR, "could not find target tuple");
    2604            3 :             apply_concurrent_delete(rel, ondisk_tuple);
    2605              :         }
    2606           22 :         else if (kind == CHANGE_UPDATE_NEW)
    2607              :         {
    2608              :             TupleTableSlot *key;
    2609              :             bool        found;
    2610              : 
    2611           22 :             if (have_old_tuple)
    2612            8 :                 key = old_update_tuple;
    2613              :             else
    2614           14 :                 key = spilled_tuple;
    2615              : 
    2616              :             /* Find the tuple to be updated or deleted. */
    2617           22 :             found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
    2618           22 :             if (!found)
    2619            0 :                 elog(ERROR, "could not find target tuple");
    2620              : 
    2621              :             /*
    2622              :              * If 'tup' contains TOAST pointers, they point to the old
    2623              :              * relation's toast. Copy the corresponding TOAST pointers for the
    2624              :              * new relation from the existing tuple. (The fact that we
    2625              :              * received a TOAST pointer here implies that the attribute hasn't
    2626              :              * changed.)
    2627              :              */
    2628           22 :             adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
    2629              : 
    2630           22 :             apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
    2631              : 
    2632           22 :             ExecClearTuple(old_update_tuple);
    2633           22 :             have_old_tuple = false;
    2634              :         }
    2635              :         else
    2636            0 :             elog(ERROR, "unrecognized kind of change: %d", kind);
    2637              : 
    2638           32 :         ResetPerTupleExprContext(chgcxt->cc_estate);
    2639              :     }
    2640              : 
    2641              :     /* Cleanup. */
    2642           14 :     ExecDropSingleTupleTableSlot(spilled_tuple);
    2643           14 :     ExecDropSingleTupleTableSlot(ondisk_tuple);
    2644           14 :     ExecDropSingleTupleTableSlot(old_update_tuple);
    2645              : 
    2646           14 :     MemoryContextSwitchTo(oldcxt);
    2647           14 : }
    2648              : 
    2649              : /*
    2650              :  * Apply an insert from the spill of concurrent changes to the new copy of the
    2651              :  * table.
    2652              :  */
    2653              : static void
    2654            7 : apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
    2655              :                         ChangeContext *chgcxt)
    2656              : {
    2657              :     /* Put the tuple in the table, but make sure it won't be decoded */
    2658            7 :     table_tuple_insert(rel, slot, GetCurrentCommandId(true),
    2659              :                        TABLE_INSERT_NO_LOGICAL, NULL);
    2660              : 
    2661              :     /* Update indexes with this new tuple. */
    2662            7 :     ExecInsertIndexTuples(chgcxt->cc_rri,
    2663              :                           chgcxt->cc_estate,
    2664              :                           0,
    2665              :                           slot,
    2666              :                           NIL, NULL);
    2667            7 :     pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
    2668            7 : }
    2669              : 
    2670              : /*
    2671              :  * Apply an update from the spill of concurrent changes to the new copy of the
    2672              :  * table.
    2673              :  */
    2674              : static void
    2675           22 : apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
    2676              :                         TupleTableSlot *ondisk_tuple,
    2677              :                         ChangeContext *chgcxt)
    2678              : {
    2679              :     LockTupleMode lockmode;
    2680              :     TM_FailureData tmfd;
    2681              :     TU_UpdateIndexes update_indexes;
    2682              :     TM_Result   res;
    2683              : 
    2684              :     /*
    2685              :      * Carry out the update, skipping logical decoding for it.
    2686              :      */
    2687           22 :     res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
    2688              :                              GetCurrentCommandId(true),
    2689              :                              TABLE_UPDATE_NO_LOGICAL,
    2690              :                              InvalidSnapshot,
    2691              :                              InvalidSnapshot,
    2692              :                              false,
    2693              :                              &tmfd, &lockmode, &update_indexes);
    2694           22 :     if (res != TM_Ok)
    2695            0 :         ereport(ERROR,
    2696              :                 errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
    2697              :                 errmsg("could not apply concurrent %s on relation \"%s\"",
    2698              :                        "UPDATE", RelationGetRelationName(rel)));
    2699              : 
    2700           22 :     if (update_indexes != TU_None)
    2701              :     {
    2702            8 :         uint32      flags = EIIT_IS_UPDATE;
    2703              : 
    2704            8 :         if (update_indexes == TU_Summarizing)
    2705            0 :             flags |= EIIT_ONLY_SUMMARIZING;
    2706            8 :         ExecInsertIndexTuples(chgcxt->cc_rri,
    2707              :                               chgcxt->cc_estate,
    2708              :                               flags,
    2709              :                               spilled_tuple,
    2710              :                               NIL, NULL);
    2711              :     }
    2712              : 
    2713           22 :     pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
    2714           22 : }
    2715              : 
    2716              : static void
    2717            3 : apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
    2718              : {
    2719              :     TM_Result   res;
    2720              :     TM_FailureData tmfd;
    2721              : 
    2722              :     /*
    2723              :      * Delete tuple from the new heap, skipping logical decoding for it.
    2724              :      */
    2725            3 :     res = table_tuple_delete(rel, &(slot->tts_tid),
    2726              :                              GetCurrentCommandId(true),
    2727              :                              TABLE_DELETE_NO_LOGICAL,
    2728              :                              InvalidSnapshot, InvalidSnapshot,
    2729              :                              false,
    2730              :                              &tmfd);
    2731              : 
    2732            3 :     if (res != TM_Ok)
    2733            0 :         ereport(ERROR,
    2734              :                 errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
    2735              :                 errmsg("could not apply concurrent %s on relation \"%s\"",
    2736              :                        "DELETE", RelationGetRelationName(rel)));
    2737              : 
    2738            3 :     pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
    2739            3 : }
    2740              : 
    2741              : /*
    2742              :  * Read tuple from file and put it in the input slot.  All memory is allocated
    2743              :  * in the current memory context; caller is responsible for freeing it as
    2744              :  * appropriate.
    2745              :  *
    2746              :  * External attributes are stored in separate memory chunks, in order to avoid
    2747              :  * exceeding MaxAllocSize - that could happen if the individual attributes are
    2748              :  * smaller than MaxAllocSize but the whole tuple is bigger.
    2749              :  */
    2750              : static void
    2751           40 : restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
    2752              : {
    2753              :     uint32      t_len;
    2754              :     HeapTuple   tup;
    2755              :     int         natt_ext;
    2756              : 
    2757              :     /* Read the tuple. */
    2758           40 :     BufFileReadExact(file, &t_len, sizeof(t_len));
    2759           40 :     tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
    2760           40 :     tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
    2761           40 :     BufFileReadExact(file, tup->t_data, t_len);
    2762           40 :     tup->t_len = t_len;
    2763           40 :     ItemPointerSetInvalid(&tup->t_self);
    2764           40 :     tup->t_tableOid = RelationGetRelid(relation);
    2765              : 
    2766              :     /*
    2767              :      * Put the tuple we read in a slot. This deforms it, so that we can hack
    2768              :      * the external attributes in place.
    2769              :      */
    2770           40 :     ExecForceStoreHeapTuple(tup, slot, false);
    2771              : 
    2772              :     /*
    2773              :      * Next, read any attributes we stored separately into the tts_values
    2774              :      * array elements expecting them, if any.  This matches
    2775              :      * repack_store_change.
    2776              :      */
    2777           40 :     BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
    2778           40 :     if (natt_ext > 0)
    2779              :     {
    2780           11 :         TupleDesc   desc = slot->tts_tupleDescriptor;
    2781              : 
    2782           66 :         for (int i = 0; i < desc->natts; i++)
    2783              :         {
    2784           55 :             CompactAttribute *attr = TupleDescCompactAttr(desc, i);
    2785              :             varlena    *varlen;
    2786              :             uint64      chunk_header;
    2787              :             void       *value;
    2788              :             Size        varlensz;
    2789              : 
    2790           55 :             if (attr->attisdropped || attr->attlen != -1)
    2791           40 :                 continue;
    2792           22 :             if (slot_attisnull(slot, i + 1))
    2793            0 :                 continue;
    2794           22 :             varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
    2795           22 :             if (!VARATT_IS_EXTERNAL_INDIRECT(varlen))
    2796            7 :                 continue;
    2797           15 :             slot_getsomeattrs(slot, i + 1);
    2798              : 
    2799           15 :             BufFileReadExact(file, &chunk_header, VARHDRSZ);
    2800           15 :             varlensz = VARSIZE_ANY(&chunk_header);
    2801              : 
    2802           15 :             value = palloc(varlensz);
    2803           15 :             memcpy(value, &chunk_header, VARHDRSZ);
    2804           15 :             BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
    2805              : 
    2806           15 :             slot->tts_values[i] = PointerGetDatum(value);
    2807           15 :             natt_ext--;
    2808           15 :             if (natt_ext < 0)
    2809            0 :                 ereport(ERROR,
    2810              :                         errcode(ERRCODE_DATA_CORRUPTED),
    2811              :                         errmsg("insufficient number of attributes stored separately"));
    2812              :         }
    2813              :     }
    2814           40 : }
    2815              : 
    2816              : /*
    2817              :  * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
    2818              :  * corresponding ones from 'src'.
    2819              :  */
    2820              : static void
    2821           22 : adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
    2822              : {
    2823           22 :     TupleDesc   desc = dest->tts_tupleDescriptor;
    2824              : 
    2825          104 :     for (int i = 0; i < desc->natts; i++)
    2826              :     {
    2827           82 :         CompactAttribute *attr = TupleDescCompactAttr(desc, i);
    2828              :         varlena    *varlena_dst;
    2829              : 
    2830           82 :         if (attr->attisdropped)
    2831           24 :             continue;
    2832           58 :         if (attr->attlen != -1)
    2833           28 :             continue;
    2834           30 :         if (slot_attisnull(dest, i + 1))
    2835            0 :             continue;
    2836              : 
    2837           30 :         slot_getsomeattrs(dest, i + 1);
    2838              : 
    2839           30 :         varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
    2840           30 :         if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst))
    2841           28 :             continue;
    2842            2 :         slot_getsomeattrs(src, i + 1);
    2843              : 
    2844            2 :         dest->tts_values[i] = src->tts_values[i];
    2845              :     }
    2846           22 : }
    2847              : 
    2848              : /*
    2849              :  * Find the tuple to be updated or deleted by the given data change, whose
    2850              :  * tuple has already been loaded into locator.
    2851              :  *
    2852              :  * If the tuple is found, put it in retrieved and return true.  If the tuple is
    2853              :  * not found, return false.
    2854              :  */
    2855              : static bool
    2856           25 : find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
    2857              :                   TupleTableSlot *retrieved)
    2858              : {
    2859           25 :     Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
    2860              :     IndexScanDesc scan;
    2861           25 :     bool        retval = false;
    2862              : 
    2863              :     /*
    2864              :      * Scan key is passed by caller, so it does not have to be constructed
    2865              :      * multiple times. Key entries have all fields initialized, except for
    2866              :      * sk_argument.
    2867              :      *
    2868              :      * Use the incoming tuple to finalize the scan key.
    2869              :      */
    2870           52 :     for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
    2871              :     {
    2872           27 :         ScanKey     entry = &chgcxt->cc_ident_key[i];
    2873           27 :         AttrNumber  attno = idx->indkey.values[i];
    2874              : 
    2875           27 :         entry->sk_argument = locator->tts_values[attno - 1];
    2876              :         Assert(!locator->tts_isnull[attno - 1]);
    2877              :     }
    2878              : 
    2879              :     /* XXX no instrumentation for now */
    2880           25 :     scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
    2881              :                            NULL, chgcxt->cc_ident_key_nentries, 0, 0);
    2882           25 :     index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
    2883           26 :     while (index_getnext_slot(scan, ForwardScanDirection, retrieved))
    2884              :     {
    2885              :         /* Be wary of temporal constraints */
    2886           26 :         if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
    2887              :         {
    2888            1 :             CHECK_FOR_INTERRUPTS();
    2889            1 :             continue;
    2890              :         }
    2891              : 
    2892           25 :         retval = true;
    2893           25 :         break;
    2894              :     }
    2895           25 :     index_endscan(scan);
    2896              : 
    2897           25 :     return retval;
    2898              : }
    2899              : 
    2900              : /*
    2901              :  * Check whether the candidate tuple matches the locator tuple on all replica
    2902              :  * identity key columns, using the same equality operators as the identity
    2903              :  * index scan.  The locator tuple has already been loaded into cc_ident_key.
    2904              :  *
    2905              :  * This is needed to filter lossy index matches, such as GiST multirange scans
    2906              :  * used for temporal constraints.
    2907              :  */
    2908              : static bool
    2909            2 : identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator,
    2910              :                    TupleTableSlot *candidate)
    2911              : {
    2912            2 :     slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
    2913            2 :     slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
    2914              : 
    2915            4 :     for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
    2916              :     {
    2917            3 :         ScanKey     entry = &chgcxt->cc_ident_key[i];
    2918            3 :         AttrNumber  attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
    2919              : 
    2920              :         Assert(attno > 0);
    2921              : 
    2922            3 :         if (locator->tts_isnull[attno - 1] != candidate->tts_isnull[attno - 1])
    2923            0 :             return false;
    2924              : 
    2925            3 :         if (locator->tts_isnull[attno - 1])
    2926            0 :             continue;
    2927              : 
    2928            3 :         if (!DatumGetBool(FunctionCall2Coll(&entry->sk_func,
    2929              :                                             entry->sk_collation,
    2930            3 :                                             candidate->tts_values[attno - 1],
    2931              :                                             entry->sk_argument)))
    2932            1 :             return false;
    2933              :     }
    2934              : 
    2935            1 :     return true;
    2936              : }
    2937              : 
    2938              : /*
    2939              :  * Decode and apply concurrent changes, up to (and including) the record whose
    2940              :  * LSN is 'end_of_wal'.
    2941              :  *
    2942              :  * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
    2943              :  * are far too similar to each other.
    2944              :  */
    2945              : static void
    2946           14 : process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
    2947              : {
    2948              :     DecodingWorkerShared *shared;
    2949              :     char        fname[MAXPGPATH];
    2950              :     BufFile    *file;
    2951              : 
    2952           14 :     pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    2953              :                                  PROGRESS_REPACK_PHASE_CATCH_UP);
    2954              : 
    2955              :     /* Ask the worker for the file. */
    2956           14 :     shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
    2957           14 :     SpinLockAcquire(&shared->mutex);
    2958           14 :     shared->lsn_upto = end_of_wal;
    2959           14 :     shared->done = done;
    2960           14 :     SpinLockRelease(&shared->mutex);
    2961              : 
    2962              :     /*
    2963              :      * The worker needs to finish processing of the current WAL record. Even
    2964              :      * if it's idle, it'll need to close the output file. Thus we're likely to
    2965              :      * wait, so prepare for sleep.
    2966              :      */
    2967           14 :     ConditionVariablePrepareToSleep(&shared->cv);
    2968              :     for (;;)
    2969           14 :     {
    2970              :         int         last_exported;
    2971              : 
    2972           28 :         SpinLockAcquire(&shared->mutex);
    2973           28 :         last_exported = shared->last_exported;
    2974           28 :         SpinLockRelease(&shared->mutex);
    2975              : 
    2976              :         /*
    2977              :          * Has the worker exported the file we are waiting for?
    2978              :          */
    2979           28 :         if (last_exported == chgcxt->cc_file_seq)
    2980           14 :             break;
    2981              : 
    2982           14 :         ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
    2983              :     }
    2984           14 :     ConditionVariableCancelSleep();
    2985              : 
    2986              :     /* Open the file. */
    2987           14 :     DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
    2988           14 :     file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
    2989           14 :     apply_concurrent_changes(file, chgcxt);
    2990              : 
    2991           14 :     BufFileClose(file);
    2992              : 
    2993              :     /* Get ready for the next file. */
    2994           14 :     chgcxt->cc_file_seq++;
    2995           14 : }
    2996              : 
    2997              : /*
    2998              :  * Initialize the ChangeContext struct for the given relation, with
    2999              :  * the given index as identity index.
    3000              :  */
    3001              : static void
    3002            7 : initialize_change_context(ChangeContext *chgcxt,
    3003              :                           Relation relation, Oid ident_index_id)
    3004              : {
    3005            7 :     chgcxt->cc_rel = relation;
    3006              : 
    3007              :     /* Only initialize fields needed by ExecInsertIndexTuples(). */
    3008            7 :     chgcxt->cc_estate = CreateExecutorState();
    3009              : 
    3010            7 :     chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
    3011            7 :     InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
    3012            7 :     ExecOpenIndices(chgcxt->cc_rri, false);
    3013              : 
    3014              :     /*
    3015              :      * The table's relcache entry already has the relcache entry for the
    3016              :      * identity index; find that.
    3017              :      */
    3018            7 :     chgcxt->cc_ident_index = NULL;
    3019            7 :     for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
    3020              :     {
    3021              :         Relation    ind_rel;
    3022              : 
    3023            7 :         ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
    3024            7 :         if (ind_rel->rd_id == ident_index_id)
    3025              :         {
    3026            7 :             chgcxt->cc_ident_index = ind_rel;
    3027            7 :             break;
    3028              :         }
    3029              :     }
    3030            7 :     if (chgcxt->cc_ident_index == NULL)
    3031            0 :         elog(ERROR, "could not find identity index");
    3032              : 
    3033              :     /* Set up for scanning said identity index */
    3034              :     {
    3035              :         Form_pg_index indexForm;
    3036              : 
    3037            7 :         indexForm = chgcxt->cc_ident_index->rd_index;
    3038            7 :         chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
    3039            7 :         chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
    3040           16 :         for (int i = 0; i < indexForm->indnkeyatts; i++)
    3041              :         {
    3042              :             ScanKey     entry;
    3043              :             Oid         opfamily,
    3044              :                         opcintype,
    3045              :                         opno,
    3046              :                         opcode;
    3047              :             StrategyNumber eq_strategy;
    3048              : 
    3049            9 :             entry = &chgcxt->cc_ident_key[i];
    3050              : 
    3051            9 :             opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
    3052            9 :             opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
    3053            9 :             eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ,
    3054            9 :                                                       chgcxt->cc_ident_index->rd_rel->relam,
    3055              :                                                       opfamily, false);
    3056            9 :             if (eq_strategy == InvalidStrategy)
    3057            0 :                 elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
    3058              :                      opfamily, opcintype);
    3059            9 :             opno = get_opfamily_member(opfamily, opcintype, opcintype,
    3060              :                                        eq_strategy);
    3061            9 :             if (!OidIsValid(opno))
    3062            0 :                 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
    3063              :                      eq_strategy, opcintype, opcintype, opfamily);
    3064            9 :             opcode = get_opcode(opno);
    3065            9 :             if (!OidIsValid(opcode))
    3066            0 :                 elog(ERROR, "missing oprcode for operator %u", opno);
    3067              : 
    3068              :             /* Initialize everything but argument. */
    3069            9 :             ScanKeyInit(entry,
    3070            9 :                         i + 1,
    3071              :                         eq_strategy, opcode,
    3072              :                         (Datum) 0);
    3073            9 :             entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
    3074              :         }
    3075              :     }
    3076              : 
    3077              :     /* Determine the last column we must deform to read the identity */
    3078            7 :     chgcxt->cc_last_key_attno = InvalidAttrNumber;
    3079           16 :     for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
    3080              :     {
    3081            9 :         AttrNumber  attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
    3082              : 
    3083              :         Assert(attno > 0);
    3084            9 :         chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
    3085              :     }
    3086              : 
    3087            7 :     chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
    3088            7 : }
    3089              : 
    3090              : /*
    3091              :  * Free up resources taken by a ChangeContext.
    3092              :  */
    3093              : static void
    3094            7 : release_change_context(ChangeContext *chgcxt)
    3095              : {
    3096            7 :     ExecCloseIndices(chgcxt->cc_rri);
    3097            7 :     FreeExecutorState(chgcxt->cc_estate);
    3098              :     /* XXX are these pfrees necessary? */
    3099            7 :     pfree(chgcxt->cc_rri);
    3100            7 :     pfree(chgcxt->cc_ident_key);
    3101            7 : }
    3102              : 
    3103              : /*
    3104              :  * The final steps of rebuild_relation() for concurrent processing.
    3105              :  *
    3106              :  * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
    3107              :  * clustering index (if one is passed) are still locked in a mode that allows
    3108              :  * concurrent data changes. On exit, both tables and their indexes are closed,
    3109              :  * but locked in AccessExclusiveLock mode.
    3110              :  */
    3111              : static void
    3112            7 : rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
    3113              :                                    Oid identIdx, TransactionId frozenXid,
    3114              :                                    MultiXactId cutoffMulti)
    3115              : {
    3116              :     List       *ind_oids_new;
    3117            7 :     Oid         old_table_oid = RelationGetRelid(OldHeap);
    3118            7 :     Oid         new_table_oid = RelationGetRelid(NewHeap);
    3119            7 :     List       *ind_oids_old = RelationGetIndexList(OldHeap);
    3120              :     ListCell   *lc,
    3121              :                *lc2;
    3122              :     char        relpersistence;
    3123              :     bool        is_system_catalog;
    3124              :     Oid         ident_idx_new;
    3125              :     XLogRecPtr  end_of_wal;
    3126              :     List       *indexrels;
    3127              :     ChangeContext chgcxt;
    3128              : 
    3129              :     Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false));
    3130              :     Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
    3131              : 
    3132              :     /*
    3133              :      * Unlike the exclusive case, we build new indexes for the new relation
    3134              :      * rather than swapping the storage and reindexing the old relation. The
    3135              :      * point is that the index build can take some time, so we do it before we
    3136              :      * get AccessExclusiveLock on the old heap and therefore we cannot swap
    3137              :      * the heap storage yet.
    3138              :      *
    3139              :      * index_create() will lock the new indexes using AccessExclusiveLock - no
    3140              :      * need to change that. At the same time, we use ShareUpdateExclusiveLock
    3141              :      * to lock the existing indexes - that should be enough to prevent others
    3142              :      * from changing them while we're repacking the relation. The lock on
    3143              :      * table should prevent others from changing the index column list, but
    3144              :      * might not be enough for commands like ALTER INDEX ... SET ... (Those
    3145              :      * are not necessarily dangerous, but can make user confused if the
    3146              :      * changes they do get lost due to REPACK.)
    3147              :      */
    3148            7 :     ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
    3149              : 
    3150              :     /*
    3151              :      * The identity index in the new relation appears in the same relative
    3152              :      * position as the corresponding index in the old relation.  Find it.
    3153              :      */
    3154            7 :     ident_idx_new = InvalidOid;
    3155           14 :     foreach_oid(ind_old, ind_oids_old)
    3156              :     {
    3157            7 :         if (identIdx == ind_old)
    3158              :         {
    3159            7 :             int         pos = foreach_current_index(ind_old);
    3160              : 
    3161            7 :             if (list_length(ind_oids_new) <= pos)
    3162            0 :                 elog(ERROR, "list of new indexes too short");
    3163            7 :             ident_idx_new = list_nth_oid(ind_oids_new, pos);
    3164            7 :             break;
    3165              :         }
    3166              :     }
    3167            7 :     if (!OidIsValid(ident_idx_new))
    3168            0 :         elog(ERROR, "could not find index matching \"%s\" at the new relation",
    3169              :              get_rel_name(identIdx));
    3170              : 
    3171              :     /* Gather information to apply concurrent changes. */
    3172            7 :     initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
    3173              : 
    3174              :     /*
    3175              :      * During testing, wait for another backend to perform concurrent data
    3176              :      * changes which we will process below.
    3177              :      */
    3178            7 :     INJECTION_POINT("repack-concurrently-before-lock", NULL);
    3179              : 
    3180              :     /*
    3181              :      * Flush all WAL records inserted so far (possibly except for the last
    3182              :      * incomplete page; see GetInsertRecPtr), to minimize the amount of data
    3183              :      * we need to flush while holding exclusive lock on the source table.
    3184              :      */
    3185            7 :     XLogFlush(GetXLogInsertEndRecPtr());
    3186            7 :     end_of_wal = GetFlushRecPtr(NULL);
    3187              : 
    3188              :     /*
    3189              :      * Apply concurrent changes first time, to minimize the time we need to
    3190              :      * hold AccessExclusiveLock. (Quite some amount of WAL could have been
    3191              :      * written during the data copying and index creation.)
    3192              :      */
    3193            7 :     process_concurrent_changes(end_of_wal, &chgcxt, false);
    3194              : 
    3195              :     /*
    3196              :      * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
    3197              :      * is one), all its indexes, so that we can swap the files.
    3198              :      */
    3199            7 :     LockRelationOid(old_table_oid, AccessExclusiveLock);
    3200              : 
    3201              :     /*
    3202              :      * Lock all indexes now, not only the clustering one: all indexes need to
    3203              :      * have their files swapped. While doing that, store their relation
    3204              :      * references in a zero-terminated array, to handle predicate locks below.
    3205              :      */
    3206            7 :     indexrels = NIL;
    3207           22 :     foreach_oid(ind_oid, ind_oids_old)
    3208              :     {
    3209              :         Relation    index;
    3210              : 
    3211            8 :         index = index_open(ind_oid, AccessExclusiveLock);
    3212              : 
    3213              :         /*
    3214              :          * Some things about the index may have changed before we locked the
    3215              :          * index, such as ALTER INDEX RENAME.  We don't need to do anything
    3216              :          * here to absorb those changes in the new index.
    3217              :          */
    3218            8 :         indexrels = lappend(indexrels, index);
    3219              :     }
    3220              : 
    3221              :     /*
    3222              :      * Lock the OldHeap's TOAST relation exclusively - again, the lock is
    3223              :      * needed to swap the files.
    3224              :      */
    3225            7 :     if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
    3226            3 :         LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
    3227              : 
    3228              :     /*
    3229              :      * Tuples and pages of the old heap will be gone, but the heap will stay.
    3230              :      */
    3231            7 :     TransferPredicateLocksToHeapRelation(OldHeap);
    3232           22 :     foreach_ptr(RelationData, index, indexrels)
    3233              :     {
    3234            8 :         TransferPredicateLocksToHeapRelation(index);
    3235            8 :         index_close(index, NoLock);
    3236              :     }
    3237            7 :     list_free(indexrels);
    3238              : 
    3239              :     /*
    3240              :      * Flush WAL again, to make sure that all changes committed while we were
    3241              :      * waiting for the exclusive lock are available for decoding.
    3242              :      */
    3243            7 :     XLogFlush(GetXLogInsertEndRecPtr());
    3244            7 :     end_of_wal = GetFlushRecPtr(NULL);
    3245              : 
    3246              :     /*
    3247              :      * Apply the concurrent changes again. Indicate that the decoding worker
    3248              :      * won't be needed anymore.
    3249              :      */
    3250            7 :     process_concurrent_changes(end_of_wal, &chgcxt, true);
    3251              : 
    3252              :     /* Remember info about rel before closing OldHeap */
    3253            7 :     relpersistence = OldHeap->rd_rel->relpersistence;
    3254            7 :     is_system_catalog = IsSystemRelation(OldHeap);
    3255              : 
    3256            7 :     pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    3257              :                                  PROGRESS_REPACK_PHASE_SWAP_REL_FILES);
    3258              : 
    3259              :     /*
    3260              :      * Even ShareUpdateExclusiveLock should have prevented others from
    3261              :      * creating / dropping indexes (even using the CONCURRENTLY option), so we
    3262              :      * do not need to check whether the lists match.
    3263              :      */
    3264           15 :     forboth(lc, ind_oids_old, lc2, ind_oids_new)
    3265              :     {
    3266            8 :         Oid         ind_old = lfirst_oid(lc);
    3267            8 :         Oid         ind_new = lfirst_oid(lc2);
    3268            8 :         Oid         mapped_tables[4] = {0};
    3269              : 
    3270            8 :         swap_relation_files(ind_old, ind_new,
    3271              :                             (old_table_oid == RelationRelationId),
    3272              :                             false,  /* swap_toast_by_content */
    3273              :                             true,
    3274              :                             InvalidTransactionId,
    3275              :                             InvalidMultiXactId,
    3276              :                             mapped_tables);
    3277              : 
    3278              : #ifdef USE_ASSERT_CHECKING
    3279              : 
    3280              :         /*
    3281              :          * Concurrent processing is not supported for system relations, so
    3282              :          * there should be no mapped tables.
    3283              :          */
    3284              :         for (int i = 0; i < 4; i++)
    3285              :             Assert(!OidIsValid(mapped_tables[i]));
    3286              : #endif
    3287              :     }
    3288              : 
    3289              :     /* The new indexes must be visible for deletion. */
    3290            7 :     CommandCounterIncrement();
    3291              : 
    3292              :     /* Close the old heap but keep lock until transaction commit. */
    3293            7 :     table_close(OldHeap, NoLock);
    3294              :     /* Close the new heap. (We didn't have to open its indexes). */
    3295            7 :     table_close(NewHeap, NoLock);
    3296              : 
    3297              :     /* Cleanup what we don't need anymore. (And close the identity index.) */
    3298            7 :     release_change_context(&chgcxt);
    3299              : 
    3300              :     /*
    3301              :      * Swap the relations and their TOAST relations and TOAST indexes. This
    3302              :      * also drops the new relation and its indexes.
    3303              :      *
    3304              :      * (System catalogs are currently not supported.)
    3305              :      */
    3306              :     Assert(!is_system_catalog);
    3307            7 :     finish_heap_swap(old_table_oid, new_table_oid,
    3308              :                      is_system_catalog,
    3309              :                      false,     /* swap_toast_by_content */
    3310              :                      false,
    3311              :                      true,
    3312              :                      false,     /* reindex */
    3313              :                      frozenXid, cutoffMulti,
    3314              :                      relpersistence);
    3315            7 : }
    3316              : 
    3317              : /*
    3318              :  * Build indexes on NewHeap according to those on OldHeap.
    3319              :  *
    3320              :  * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
    3321              :  * up locked using ShareUpdateExclusiveLock.
    3322              :  *
    3323              :  * A list of OIDs of the corresponding indexes created on NewHeap is
    3324              :  * returned. The order of items does match, so we can use these arrays to swap
    3325              :  * index storage.
    3326              :  */
    3327              : static List *
    3328            7 : build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
    3329              : {
    3330            7 :     List       *result = NIL;
    3331              : 
    3332            7 :     pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
    3333              :                                  PROGRESS_REPACK_PHASE_REBUILD_INDEX);
    3334              : 
    3335           22 :     foreach_oid(oldindex, OldIndexes)
    3336              :     {
    3337              :         Oid         newindex;
    3338              :         char       *newName;
    3339              :         Relation    ind;
    3340              : 
    3341            8 :         ind = index_open(oldindex, ShareUpdateExclusiveLock);
    3342              : 
    3343            8 :         newName = ChooseRelationName(get_rel_name(oldindex),
    3344              :                                      NULL,
    3345              :                                      "repacknew",
    3346            8 :                                      get_rel_namespace(ind->rd_index->indrelid),
    3347              :                                      false);
    3348            8 :         newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
    3349            8 :                                      oldindex, ind->rd_rel->reltablespace,
    3350              :                                      newName);
    3351            8 :         copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
    3352            8 :         result = lappend_oid(result, newindex);
    3353              : 
    3354            8 :         index_close(ind, NoLock);
    3355              :     }
    3356              : 
    3357            7 :     return result;
    3358              : }
    3359              : 
    3360              : /*
    3361              :  * Create a transient copy of a constraint -- supported by a transient
    3362              :  * copy of the index that supports the original constraint.
    3363              :  *
    3364              :  * When repacking a table that contains exclusion constraints, the executor
    3365              :  * relies on these constraints being properly catalogued.  These copies are
    3366              :  * to support that.
    3367              :  *
    3368              :  * We don't need the constraints for anything else (the original constraints
    3369              :  * will be there once repack completes), so we add pg_depend entries so that
    3370              :  * the are dropped when the transient table is dropped.
    3371              :  */
    3372              : static void
    3373            8 : copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
    3374              : {
    3375              :     ScanKeyData skey;
    3376              :     Relation    rel;
    3377              :     TupleDesc   desc;
    3378              :     SysScanDesc scan;
    3379              :     HeapTuple   tup;
    3380              :     ObjectAddress objrel;
    3381              : 
    3382            8 :     rel = table_open(ConstraintRelationId, RowExclusiveLock);
    3383            8 :     ObjectAddressSet(objrel, RelationRelationId, new_heap_id);
    3384              : 
    3385              :     /*
    3386              :      * Retrieve the constraints supported by the old index and create an
    3387              :      * identical one that points to the new index.
    3388              :      */
    3389            8 :     ScanKeyInit(&skey,
    3390              :                 Anum_pg_constraint_conrelid,
    3391              :                 BTEqualStrategyNumber, F_OIDEQ,
    3392            8 :                 ObjectIdGetDatum(old_index->rd_index->indrelid));
    3393            8 :     scan = systable_beginscan(rel, ConstraintRelidTypidNameIndexId, true,
    3394              :                               NULL, 1, &skey);
    3395            8 :     desc = RelationGetDescr(rel);
    3396           26 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
    3397              :     {
    3398           18 :         Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
    3399              :         Oid         oid;
    3400           18 :         Datum       values[Natts_pg_constraint] = {0};
    3401           18 :         bool        nulls[Natts_pg_constraint] = {0};
    3402           18 :         bool        replaces[Natts_pg_constraint] = {0};
    3403              :         HeapTuple   new_tup;
    3404              :         ObjectAddress objcon;
    3405              : 
    3406           18 :         if (conform->conindid != RelationGetRelid(old_index))
    3407           11 :             continue;
    3408              : 
    3409            7 :         oid = GetNewOidWithIndex(rel, ConstraintOidIndexId,
    3410              :                                  Anum_pg_constraint_oid);
    3411            7 :         values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(oid);
    3412            7 :         replaces[Anum_pg_constraint_oid - 1] = true;
    3413            7 :         values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(new_heap_id);
    3414            7 :         replaces[Anum_pg_constraint_conrelid - 1] = true;
    3415            7 :         values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(new_index_id);
    3416            7 :         replaces[Anum_pg_constraint_conindid - 1] = true;
    3417              : 
    3418            7 :         new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
    3419              : 
    3420              :         /* Insert it into the catalog. */
    3421            7 :         CatalogTupleInsert(rel, new_tup);
    3422              : 
    3423              :         /* Create a dependency so it's removed when we drop the new heap. */
    3424            7 :         ObjectAddressSet(objcon, ConstraintRelationId, oid);
    3425            7 :         recordDependencyOn(&objcon, &objrel, DEPENDENCY_AUTO);
    3426              :     }
    3427            8 :     systable_endscan(scan);
    3428              : 
    3429            8 :     table_close(rel, RowExclusiveLock);
    3430              : 
    3431            8 :     CommandCounterIncrement();
    3432            8 : }
    3433              : 
    3434              : /*
    3435              :  * Try to start a background worker to perform logical decoding of data
    3436              :  * changes applied to relation while REPACK CONCURRENTLY is copying its
    3437              :  * contents to a new table.
    3438              :  */
    3439              : static void
    3440            7 : start_repack_decoding_worker(Oid relid)
    3441              : {
    3442              :     Size        size;
    3443              :     DecodingWorkerShared *shared;
    3444              :     shm_mq     *mq;
    3445              :     BackgroundWorker bgw;
    3446              : 
    3447            7 :     decoding_worker = palloc0_object(DecodingWorker);
    3448              : 
    3449              :     /* Setup shared memory. */
    3450            7 :     size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
    3451              :         BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
    3452            7 :     decoding_worker->seg = dsm_create(size, 0);
    3453              : 
    3454            7 :     shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
    3455            7 :     shared->initialized = false;
    3456            7 :     shared->lsn_upto = InvalidXLogRecPtr;
    3457            7 :     shared->done = false;
    3458            7 :     SharedFileSetInit(&shared->sfs, decoding_worker->seg);
    3459            7 :     shared->last_exported = -1;
    3460            7 :     SpinLockInit(&shared->mutex);
    3461            7 :     shared->dbid = MyDatabaseId;
    3462              : 
    3463              :     /*
    3464              :      * This is the UserId set in cluster_rel(). Security context shouldn't be
    3465              :      * needed for decoding worker.
    3466              :      */
    3467            7 :     shared->roleid = GetUserId();
    3468            7 :     shared->relid = relid;
    3469            7 :     ConditionVariableInit(&shared->cv);
    3470            7 :     shared->backend_proc = MyProc;
    3471            7 :     shared->backend_pid = MyProcPid;
    3472            7 :     shared->backend_proc_number = MyProcNumber;
    3473              : 
    3474            7 :     mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
    3475              :                        REPACK_ERROR_QUEUE_SIZE);
    3476            7 :     shm_mq_set_receiver(mq, MyProc);
    3477              : 
    3478            7 :     decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL);
    3479              : 
    3480            7 :     memset(&bgw, 0, sizeof(bgw));
    3481            7 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
    3482              :              "REPACK decoding worker for relation \"%s\"",
    3483              :              get_rel_name(relid));
    3484            7 :     snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
    3485            7 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
    3486              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
    3487            7 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    3488            7 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
    3489            7 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
    3490            7 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
    3491            7 :     bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg));
    3492            7 :     bgw.bgw_notify_pid = MyProcPid;
    3493              : 
    3494            7 :     if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
    3495            0 :         ereport(ERROR,
    3496              :                 errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    3497              :                 errmsg("out of background worker slots"),
    3498              :                 errhint("You might need to increase \"%s\".", "max_worker_processes"));
    3499              : 
    3500              :     /*
    3501              :      * The decoding setup must be done before the caller can have XID assigned
    3502              :      * for any reason, otherwise the worker might end up in a deadlock,
    3503              :      * waiting for the caller's transaction to end. Therefore wait here until
    3504              :      * the worker indicates that it has the logical decoding initialized.
    3505              :      */
    3506            7 :     ConditionVariablePrepareToSleep(&shared->cv);
    3507              :     for (;;)
    3508           17 :     {
    3509              :         bool        initialized;
    3510              : 
    3511           24 :         SpinLockAcquire(&shared->mutex);
    3512           24 :         initialized = shared->initialized;
    3513           24 :         SpinLockRelease(&shared->mutex);
    3514              : 
    3515           24 :         if (initialized)
    3516            7 :             break;
    3517              : 
    3518           17 :         ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
    3519              :     }
    3520            7 :     ConditionVariableCancelSleep();
    3521            7 : }
    3522              : 
    3523              : /*
    3524              :  * Stop the decoding worker and cleanup the related resources.
    3525              :  *
    3526              :  * The worker stops on its own when it knows there is no more work to do, but
    3527              :  * we need to stop it explicitly at least on ERROR in the launching backend.
    3528              :  */
    3529              : static void
    3530            7 : stop_repack_decoding_worker(void)
    3531              : {
    3532              :     /* Nothing to do if no worker was set up. */
    3533            7 :     if (decoding_worker == NULL)
    3534            0 :         return;
    3535              : 
    3536              :     /* Terminate the worker process, if one is running. */
    3537            7 :     if (decoding_worker->handle != NULL)
    3538              :     {
    3539              :         BgwHandleStatus status;
    3540              : 
    3541            7 :         TerminateBackgroundWorker(decoding_worker->handle);
    3542              :         /* The worker should really exit before the REPACK command does. */
    3543            7 :         HOLD_INTERRUPTS();
    3544            7 :         status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
    3545            7 :         RESUME_INTERRUPTS();
    3546              : 
    3547            7 :         if (status == BGWH_POSTMASTER_DIED)
    3548            0 :             ereport(FATAL,
    3549              :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
    3550              :                     errmsg("postmaster exited during REPACK command"));
    3551              :     }
    3552              : 
    3553              :     /*
    3554              :      * Now detach from our shared memory segment.  In error cases there might
    3555              :      * still be messages from the worker in the queue, which ProcessInterrupts
    3556              :      * would try to read; this is pointless (and causes an assertion failure),
    3557              :      * so set the global pointer to NULL to have ProcessRepackMessages ignore
    3558              :      * them.
    3559              :      *
    3560              :      * We must also cancel the current sleep, if one is still set up.  This is
    3561              :      * critical because the CV lives in the DSM that we're about to detach, so
    3562              :      * if we omit it, later automatic cleanup tries to clear freed memory.
    3563              :      */
    3564            7 :     if (decoding_worker->error_mqh != NULL)
    3565            7 :         shm_mq_detach(decoding_worker->error_mqh);
    3566            7 :     ConditionVariableCancelSleep();
    3567            7 :     if (decoding_worker->seg != NULL)
    3568            7 :         dsm_detach(decoding_worker->seg);
    3569            7 :     pfree(decoding_worker);
    3570            7 :     decoding_worker = NULL;
    3571              : }
    3572              : 
    3573              : /* stop_repack_decoding_worker, wrapped as a before_shmem_exit callback */
    3574              : static void
    3575            0 : stop_repack_decoding_worker_cb(int code, Datum arg)
    3576              : {
    3577            0 :     stop_repack_decoding_worker();
    3578            0 : }
    3579              : 
    3580              : /*
    3581              :  * Get the initial snapshot from the decoding worker.
    3582              :  */
    3583              : static Snapshot
    3584            7 : get_initial_snapshot(DecodingWorker *worker)
    3585              : {
    3586              :     DecodingWorkerShared *shared;
    3587              :     char        fname[MAXPGPATH];
    3588              :     BufFile    *file;
    3589              :     Size        snap_size;
    3590              :     char       *snap_space;
    3591              :     Snapshot    snapshot;
    3592              : 
    3593            7 :     shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
    3594              : 
    3595              :     /*
    3596              :      * The worker needs to initialize the logical decoding, which usually
    3597              :      * takes some time. Therefore it makes sense to prepare for the sleep
    3598              :      * first.
    3599              :      */
    3600            7 :     ConditionVariablePrepareToSleep(&shared->cv);
    3601              :     for (;;)
    3602            5 :     {
    3603              :         int         last_exported;
    3604              : 
    3605           12 :         SpinLockAcquire(&shared->mutex);
    3606           12 :         last_exported = shared->last_exported;
    3607           12 :         SpinLockRelease(&shared->mutex);
    3608              : 
    3609              :         /*
    3610              :          * Has the worker exported the file we are waiting for?
    3611              :          */
    3612           12 :         if (last_exported == WORKER_FILE_SNAPSHOT)
    3613            7 :             break;
    3614              : 
    3615            5 :         ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
    3616              :     }
    3617            7 :     ConditionVariableCancelSleep();
    3618              : 
    3619              :     /* Read the snapshot from a file. */
    3620            7 :     DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT);
    3621            7 :     file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
    3622            7 :     BufFileReadExact(file, &snap_size, sizeof(snap_size));
    3623            7 :     snap_space = (char *) palloc(snap_size);
    3624            7 :     BufFileReadExact(file, snap_space, snap_size);
    3625            7 :     BufFileClose(file);
    3626              : 
    3627              :     /* Restore it. */
    3628            7 :     snapshot = RestoreSnapshot(snap_space);
    3629            7 :     pfree(snap_space);
    3630              : 
    3631            7 :     return snapshot;
    3632              : }
    3633              : 
    3634              : /*
    3635              :  * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
    3636              :  * If relations of the same 'relid' happen to be processed at the same time,
    3637              :  * they must be from different databases and therefore different backends must
    3638              :  * be involved.
    3639              :  */
    3640              : void
    3641           42 : DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
    3642              : {
    3643              :     /* The PID is already present in the fileset name, so we needn't add it */
    3644           42 :     snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
    3645           42 : }
    3646              : 
    3647              : /*
    3648              :  * Handle receipt of an interrupt indicating a repack worker message.
    3649              :  *
    3650              :  * Note: this is called within a signal handler!  All we can do is set
    3651              :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    3652              :  * ProcessRepackMessages().
    3653              :  */
    3654              : void
    3655            7 : HandleRepackMessageInterrupt(void)
    3656              : {
    3657            7 :     InterruptPending = true;
    3658            7 :     RepackMessagePending = true;
    3659            7 :     SetLatch(MyLatch);
    3660            7 : }
    3661              : 
    3662              : /*
    3663              :  * Process any queued protocol messages received from the repack worker.
    3664              :  */
    3665              : void
    3666            7 : ProcessRepackMessages(void)
    3667              : {
    3668              :     MemoryContext oldcontext;
    3669              :     static MemoryContext hpm_context = NULL;
    3670              : 
    3671              :     /*
    3672              :      * Nothing to do if we haven't launched the worker yet or have already
    3673              :      * terminated it.
    3674              :      */
    3675            7 :     if (decoding_worker == NULL)
    3676            0 :         return;
    3677              : 
    3678              :     /*
    3679              :      * This is invoked from ProcessInterrupts(), and since some of the
    3680              :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    3681              :      * for recursive calls if more signals are received while this runs.  It's
    3682              :      * unclear that recursive entry would be safe, and it doesn't seem useful
    3683              :      * even if it is safe, so let's block interrupts until done.
    3684              :      */
    3685            7 :     HOLD_INTERRUPTS();
    3686              : 
    3687              :     /*
    3688              :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
    3689              :      * don't want to risk leaking data into long-lived contexts, so let's do
    3690              :      * our work here in a private context that we can reset on each use.
    3691              :      */
    3692            7 :     if (hpm_context == NULL)    /* first time through? */
    3693            6 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
    3694              :                                             "ProcessRepackMessages",
    3695              :                                             ALLOCSET_DEFAULT_SIZES);
    3696              :     else
    3697            1 :         MemoryContextReset(hpm_context);
    3698              : 
    3699            7 :     oldcontext = MemoryContextSwitchTo(hpm_context);
    3700              : 
    3701              :     /* OK to process messages.  Reset the flag saying there are more to do. */
    3702            7 :     RepackMessagePending = false;
    3703              : 
    3704              :     /*
    3705              :      * Read as many messages as we can from the worker, but stop when no more
    3706              :      * messages can be read from the worker without blocking.
    3707              :      */
    3708              :     while (true)
    3709            0 :     {
    3710              :         shm_mq_result res;
    3711              :         Size        nbytes;
    3712              :         void       *data;
    3713              : 
    3714            7 :         res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
    3715              :                              &data, true);
    3716            7 :         if (res == SHM_MQ_WOULD_BLOCK)
    3717            0 :             break;
    3718            7 :         else if (res == SHM_MQ_SUCCESS)
    3719              :         {
    3720              :             StringInfoData msg;
    3721              : 
    3722            0 :             initStringInfo(&msg);
    3723            0 :             appendBinaryStringInfo(&msg, data, nbytes);
    3724            0 :             ProcessRepackMessage(&msg);
    3725            0 :             pfree(msg.data);
    3726              :         }
    3727              :         else
    3728              :         {
    3729              :             /*
    3730              :              * The decoding worker is special in that it exits as soon as it
    3731              :              * has its work done. Thus the DETACHED result code is fine.
    3732              :              */
    3733              :             Assert(res == SHM_MQ_DETACHED);
    3734              : 
    3735            7 :             break;
    3736              :         }
    3737              :     }
    3738              : 
    3739            7 :     MemoryContextSwitchTo(oldcontext);
    3740              : 
    3741              :     /* Might as well clear the context on our way out */
    3742            7 :     MemoryContextReset(hpm_context);
    3743              : 
    3744            7 :     RESUME_INTERRUPTS();
    3745              : }
    3746              : 
    3747              : /*
    3748              :  * Process a single protocol message received from a single parallel worker.
    3749              :  */
    3750              : static void
    3751            0 : ProcessRepackMessage(StringInfo msg)
    3752              : {
    3753              :     char        msgtype;
    3754              : 
    3755            0 :     msgtype = pq_getmsgbyte(msg);
    3756              : 
    3757            0 :     switch (msgtype)
    3758              :     {
    3759            0 :         case PqMsg_ErrorResponse:
    3760              :         case PqMsg_NoticeResponse:
    3761              :             {
    3762              :                 ErrorData   edata;
    3763              : 
    3764              :                 /* Parse ErrorResponse or NoticeResponse. */
    3765            0 :                 pq_parse_errornotice(msg, &edata);
    3766              : 
    3767              :                 /* Death of a worker isn't enough justification for suicide. */
    3768            0 :                 edata.elevel = Min(edata.elevel, ERROR);
    3769              : 
    3770              :                 /*
    3771              :                  * Add a context line to show that this is a message
    3772              :                  * propagated from the worker.  Otherwise, it can sometimes be
    3773              :                  * confusing to understand what actually happened.
    3774              :                  */
    3775            0 :                 if (edata.context)
    3776            0 :                     edata.context = psprintf("%s\n%s", edata.context,
    3777              :                                              _("REPACK decoding worker"));
    3778              :                 else
    3779            0 :                     edata.context = pstrdup(_("REPACK decoding worker"));
    3780              : 
    3781              :                 /* Rethrow error or print notice. */
    3782            0 :                 ThrowErrorData(&edata);
    3783              : 
    3784            0 :                 break;
    3785              :             }
    3786              : 
    3787            0 :         default:
    3788              :             {
    3789            0 :                 elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
    3790              :                      msgtype, msg->len);
    3791              :             }
    3792              :     }
    3793            0 : }
        

Generated by: LCOV version 2.0-1