LCOV - code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 95.1 % 324 308
Test Date: 2026-04-06 10:16:19 Functions: 92.9 % 14 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * vacuumparallel.c
       4              :  *    Support routines for parallel vacuum execution.
       5              :  *
       6              :  * This file contains routines that are intended to support setting up, using,
       7              :  * and tearing down a ParallelVacuumState.
       8              :  *
       9              :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      10              :  * with parallel worker processes.  Individual indexes are processed by one
      11              :  * vacuum process.  ParallelVacuumState contains shared information as well as
      12              :  * the memory space for storing dead items allocated in the DSA area.  We
      13              :  * launch parallel worker processes at the start of parallel index
      14              :  * bulk-deletion and index cleanup and once all indexes are processed, the
      15              :  * parallel worker processes exit.  Each time we process indexes in parallel,
      16              :  * the parallel context is re-initialized so that the same DSM can be used for
      17              :  * multiple passes of index bulk-deletion and index cleanup.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/commands/vacuumparallel.c
      24              :  *
      25              :  *-------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/amapi.h"
      30              : #include "access/table.h"
      31              : #include "access/xact.h"
      32              : #include "commands/progress.h"
      33              : #include "commands/vacuum.h"
      34              : #include "executor/instrument.h"
      35              : #include "optimizer/paths.h"
      36              : #include "pgstat.h"
      37              : #include "storage/bufmgr.h"
      38              : #include "storage/proc.h"
      39              : #include "tcop/tcopprot.h"
      40              : #include "utils/lsyscache.h"
      41              : #include "utils/rel.h"
      42              : 
      43              : /*
      44              :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      45              :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      46              :  * use small integers.
      47              :  */
      48              : #define PARALLEL_VACUUM_KEY_SHARED          1
      49              : #define PARALLEL_VACUUM_KEY_QUERY_TEXT      2
      50              : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE    3
      51              : #define PARALLEL_VACUUM_KEY_WAL_USAGE       4
      52              : #define PARALLEL_VACUUM_KEY_INDEX_STATS     5
      53              : 
      54              : /*
      55              :  * Shared information among parallel workers.  So this is allocated in the DSM
      56              :  * segment.
      57              :  */
      58              : typedef struct PVShared
      59              : {
      60              :     /*
      61              :      * Target table relid, log level (for messages about parallel workers
      62              :      * launched during VACUUM VERBOSE) and query ID.  These fields are not
      63              :      * modified during the parallel vacuum.
      64              :      */
      65              :     Oid         relid;
      66              :     int         elevel;
      67              :     int64       queryid;
      68              : 
      69              :     /*
      70              :      * Fields for both index vacuum and cleanup.
      71              :      *
      72              :      * reltuples is the total number of input heap tuples.  We set either old
      73              :      * live tuples in the index vacuum case or the new live tuples in the
      74              :      * index cleanup case.
      75              :      *
      76              :      * estimated_count is true if reltuples is an estimated value.  (Note that
      77              :      * reltuples could be -1 in this case, indicating we have no idea.)
      78              :      */
      79              :     double      reltuples;
      80              :     bool        estimated_count;
      81              : 
      82              :     /*
      83              :      * In single process vacuum we could consume more memory during index
      84              :      * vacuuming or cleanup apart from the memory for heap scanning.  In
      85              :      * parallel vacuum, since individual vacuum workers can consume memory
      86              :      * equal to maintenance_work_mem, the new maintenance_work_mem for each
      87              :      * worker is set such that the parallel operation doesn't consume more
      88              :      * memory than single process vacuum.
      89              :      */
      90              :     int         maintenance_work_mem_worker;
      91              : 
      92              :     /*
      93              :      * The number of buffers each worker's Buffer Access Strategy ring should
      94              :      * contain.
      95              :      */
      96              :     int         ring_nbuffers;
      97              : 
      98              :     /*
      99              :      * Shared vacuum cost balance.  During parallel vacuum,
     100              :      * VacuumSharedCostBalance points to this value and it accumulates the
     101              :      * balance of each parallel vacuum worker.
     102              :      */
     103              :     pg_atomic_uint32 cost_balance;
     104              : 
     105              :     /*
     106              :      * Number of active parallel workers.  This is used for computing the
     107              :      * minimum threshold of the vacuum cost balance before a worker sleeps for
     108              :      * cost-based delay.
     109              :      */
     110              :     pg_atomic_uint32 active_nworkers;
     111              : 
     112              :     /* Counter for vacuuming and cleanup */
     113              :     pg_atomic_uint32 idx;
     114              : 
     115              :     /* DSA handle where the TidStore lives */
     116              :     dsa_handle  dead_items_dsa_handle;
     117              : 
     118              :     /* DSA pointer to the shared TidStore */
     119              :     dsa_pointer dead_items_handle;
     120              : 
     121              :     /* Statistics of shared dead items */
     122              :     VacDeadItemsInfo dead_items_info;
     123              : } PVShared;
     124              : 
     125              : /* Status used during parallel index vacuum or cleanup */
     126              : typedef enum PVIndVacStatus
     127              : {
     128              :     PARALLEL_INDVAC_STATUS_INITIAL = 0,
     129              :     PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     130              :     PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     131              :     PARALLEL_INDVAC_STATUS_COMPLETED,
     132              : } PVIndVacStatus;
     133              : 
     134              : /*
     135              :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     136              :  * This includes the status of parallel index vacuum as well as index statistics.
     137              :  */
     138              : typedef struct PVIndStats
     139              : {
     140              :     /*
     141              :      * The following two fields are set by leader process before executing
     142              :      * parallel index vacuum or parallel index cleanup.  These fields are not
     143              :      * fixed for the entire VACUUM operation.  They are only fixed for an
     144              :      * individual parallel index vacuum and cleanup.
     145              :      *
     146              :      * parallel_workers_can_process is true if both leader and worker can
     147              :      * process the index, otherwise only leader can process it.
     148              :      */
     149              :     PVIndVacStatus status;
     150              :     bool        parallel_workers_can_process;
     151              : 
     152              :     /*
     153              :      * Individual worker or leader stores the result of index vacuum or
     154              :      * cleanup.
     155              :      */
     156              :     bool        istat_updated;  /* are the stats updated? */
     157              :     IndexBulkDeleteResult istat;
     158              : } PVIndStats;
     159              : 
     160              : /*
     161              :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     162              :  */
     163              : struct ParallelVacuumState
     164              : {
     165              :     /* NULL for worker processes */
     166              :     ParallelContext *pcxt;
     167              : 
     168              :     /* Parent Heap Relation */
     169              :     Relation    heaprel;
     170              : 
     171              :     /* Target indexes */
     172              :     Relation   *indrels;
     173              :     int         nindexes;
     174              : 
     175              :     /* Shared information among parallel vacuum workers */
     176              :     PVShared   *shared;
     177              : 
     178              :     /*
     179              :      * Shared index statistics among parallel vacuum workers. The array
     180              :      * element is allocated for every index, even those indexes where parallel
     181              :      * index vacuuming is unsafe or not worthwhile (e.g.,
     182              :      * will_parallel_vacuum[] is false).  During parallel vacuum,
     183              :      * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     184              :      * local memory at the end of parallel vacuum.
     185              :      */
     186              :     PVIndStats *indstats;
     187              : 
     188              :     /* Shared dead items space among parallel vacuum workers */
     189              :     TidStore   *dead_items;
     190              : 
     191              :     /* Points to buffer usage area in DSM */
     192              :     BufferUsage *buffer_usage;
     193              : 
     194              :     /* Points to WAL usage area in DSM */
     195              :     WalUsage   *wal_usage;
     196              : 
     197              :     /*
     198              :      * False if the index is totally unsuitable target for all parallel
     199              :      * processing. For example, the index could be <
     200              :      * min_parallel_index_scan_size cutoff.
     201              :      */
     202              :     bool       *will_parallel_vacuum;
     203              : 
     204              :     /*
     205              :      * The number of indexes that support parallel index bulk-deletion and
     206              :      * parallel index cleanup respectively.
     207              :      */
     208              :     int         nindexes_parallel_bulkdel;
     209              :     int         nindexes_parallel_cleanup;
     210              :     int         nindexes_parallel_condcleanup;
     211              : 
     212              :     /* Buffer access strategy used by leader process */
     213              :     BufferAccessStrategy bstrategy;
     214              : 
     215              :     /*
     216              :      * Error reporting state.  The error callback is set only for workers
     217              :      * processes during parallel index vacuum.
     218              :      */
     219              :     char       *relnamespace;
     220              :     char       *relname;
     221              :     char       *indname;
     222              :     PVIndVacStatus status;
     223              : };
     224              : 
     225              : static int  parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     226              :                                             bool *will_parallel_vacuum);
     227              : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     228              :                                                 bool vacuum, PVWorkerStats *wstats);
     229              : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     230              : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     231              : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     232              :                                               PVIndStats *indstats);
     233              : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     234              :                                                    bool vacuum);
     235              : static void parallel_vacuum_error_callback(void *arg);
     236              : 
     237              : /*
     238              :  * Try to enter parallel mode and create a parallel context.  Then initialize
     239              :  * shared memory state.
     240              :  *
     241              :  * On success, return parallel vacuum state.  Otherwise return NULL.
     242              :  */
     243              : ParallelVacuumState *
     244         6709 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     245              :                      int nrequested_workers, int vac_work_mem,
     246              :                      int elevel, BufferAccessStrategy bstrategy)
     247              : {
     248              :     ParallelVacuumState *pvs;
     249              :     ParallelContext *pcxt;
     250              :     PVShared   *shared;
     251              :     TidStore   *dead_items;
     252              :     PVIndStats *indstats;
     253              :     BufferUsage *buffer_usage;
     254              :     WalUsage   *wal_usage;
     255              :     bool       *will_parallel_vacuum;
     256              :     Size        est_indstats_len;
     257              :     Size        est_shared_len;
     258         6709 :     int         nindexes_mwm = 0;
     259         6709 :     int         parallel_workers = 0;
     260              :     int         querylen;
     261              : 
     262              :     /*
     263              :      * A parallel vacuum must be requested and there must be indexes on the
     264              :      * relation
     265              :      */
     266              :     Assert(nrequested_workers >= 0);
     267              :     Assert(nindexes > 0);
     268              : 
     269              :     /*
     270              :      * Compute the number of parallel vacuum workers to launch
     271              :      */
     272         6709 :     will_parallel_vacuum = palloc0_array(bool, nindexes);
     273         6709 :     parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     274              :                                                        nrequested_workers,
     275              :                                                        will_parallel_vacuum);
     276         6709 :     if (parallel_workers <= 0)
     277              :     {
     278              :         /* Can't perform vacuum in parallel -- return NULL */
     279         6686 :         pfree(will_parallel_vacuum);
     280         6686 :         return NULL;
     281              :     }
     282              : 
     283           23 :     pvs = palloc0_object(ParallelVacuumState);
     284           23 :     pvs->indrels = indrels;
     285           23 :     pvs->nindexes = nindexes;
     286           23 :     pvs->will_parallel_vacuum = will_parallel_vacuum;
     287           23 :     pvs->bstrategy = bstrategy;
     288           23 :     pvs->heaprel = rel;
     289              : 
     290           23 :     EnterParallelMode();
     291           23 :     pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     292              :                                  parallel_workers);
     293              :     Assert(pcxt->nworkers > 0);
     294           23 :     pvs->pcxt = pcxt;
     295              : 
     296              :     /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     297           23 :     est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     298           23 :     shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     299           23 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     300              : 
     301              :     /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     302           23 :     est_shared_len = sizeof(PVShared);
     303           23 :     shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     304           23 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     305              : 
     306              :     /*
     307              :      * Estimate space for BufferUsage and WalUsage --
     308              :      * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     309              :      *
     310              :      * If there are no extensions loaded that care, we could skip this.  We
     311              :      * have no way of knowing whether anyone's looking at pgBufferUsage or
     312              :      * pgWalUsage, so do it unconditionally.
     313              :      */
     314           23 :     shm_toc_estimate_chunk(&pcxt->estimator,
     315              :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     316           23 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     317           23 :     shm_toc_estimate_chunk(&pcxt->estimator,
     318              :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     319           23 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     320              : 
     321              :     /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     322           23 :     if (debug_query_string)
     323              :     {
     324           23 :         querylen = strlen(debug_query_string);
     325           23 :         shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     326           23 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     327              :     }
     328              :     else
     329            0 :         querylen = 0;           /* keep compiler quiet */
     330              : 
     331           23 :     InitializeParallelDSM(pcxt);
     332              : 
     333              :     /* Prepare index vacuum stats */
     334           23 :     indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     335          515 :     MemSet(indstats, 0, est_indstats_len);
     336          105 :     for (int i = 0; i < nindexes; i++)
     337              :     {
     338           82 :         Relation    indrel = indrels[i];
     339           82 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     340              : 
     341              :         /*
     342              :          * Cleanup option should be either disabled, always performing in
     343              :          * parallel or conditionally performing in parallel.
     344              :          */
     345              :         Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     346              :                ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     347              :         Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     348              : 
     349           82 :         if (!will_parallel_vacuum[i])
     350            4 :             continue;
     351              : 
     352           78 :         if (indrel->rd_indam->amusemaintenanceworkmem)
     353            8 :             nindexes_mwm++;
     354              : 
     355              :         /*
     356              :          * Remember the number of indexes that support parallel operation for
     357              :          * each phase.
     358              :          */
     359           78 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     360           70 :             pvs->nindexes_parallel_bulkdel++;
     361           78 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     362           16 :             pvs->nindexes_parallel_cleanup++;
     363           78 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     364           54 :             pvs->nindexes_parallel_condcleanup++;
     365              :     }
     366           23 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     367           23 :     pvs->indstats = indstats;
     368              : 
     369              :     /* Prepare shared information */
     370           23 :     shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     371          253 :     MemSet(shared, 0, est_shared_len);
     372           23 :     shared->relid = RelationGetRelid(rel);
     373           23 :     shared->elevel = elevel;
     374           23 :     shared->queryid = pgstat_get_my_query_id();
     375           23 :     shared->maintenance_work_mem_worker =
     376              :         (nindexes_mwm > 0) ?
     377           23 :         maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
     378              :         maintenance_work_mem;
     379           23 :     shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
     380              : 
     381              :     /* Prepare DSA space for dead items */
     382           23 :     dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
     383              :                                       LWTRANCHE_PARALLEL_VACUUM_DSA);
     384           23 :     pvs->dead_items = dead_items;
     385           23 :     shared->dead_items_handle = TidStoreGetHandle(dead_items);
     386           23 :     shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
     387              : 
     388              :     /* Use the same buffer size for all workers */
     389           23 :     shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     390              : 
     391           23 :     pg_atomic_init_u32(&(shared->cost_balance), 0);
     392           23 :     pg_atomic_init_u32(&(shared->active_nworkers), 0);
     393           23 :     pg_atomic_init_u32(&(shared->idx), 0);
     394              : 
     395           23 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     396           23 :     pvs->shared = shared;
     397              : 
     398              :     /*
     399              :      * Allocate space for each worker's BufferUsage and WalUsage; no need to
     400              :      * initialize
     401              :      */
     402           23 :     buffer_usage = shm_toc_allocate(pcxt->toc,
     403           23 :                                     mul_size(sizeof(BufferUsage), pcxt->nworkers));
     404           23 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     405           23 :     pvs->buffer_usage = buffer_usage;
     406           23 :     wal_usage = shm_toc_allocate(pcxt->toc,
     407           23 :                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     408           23 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     409           23 :     pvs->wal_usage = wal_usage;
     410              : 
     411              :     /* Store query string for workers */
     412           23 :     if (debug_query_string)
     413              :     {
     414              :         char       *sharedquery;
     415              : 
     416           23 :         sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     417           23 :         memcpy(sharedquery, debug_query_string, querylen + 1);
     418           23 :         sharedquery[querylen] = '\0';
     419           23 :         shm_toc_insert(pcxt->toc,
     420              :                        PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     421              :     }
     422              : 
     423              :     /* Success -- return parallel vacuum state */
     424           23 :     return pvs;
     425              : }
     426              : 
     427              : /*
     428              :  * Destroy the parallel context, and end parallel mode.
     429              :  *
     430              :  * Since writes are not allowed during parallel mode, copy the
     431              :  * updated index statistics from DSM into local memory and then later use that
     432              :  * to update the index statistics.  One might think that we can exit from
     433              :  * parallel mode, update the index statistics and then destroy parallel
     434              :  * context, but that won't be safe (see ExitParallelMode).
     435              :  */
     436              : void
     437           23 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     438              : {
     439              :     Assert(!IsParallelWorker());
     440              : 
     441              :     /* Copy the updated statistics */
     442          105 :     for (int i = 0; i < pvs->nindexes; i++)
     443              :     {
     444           82 :         PVIndStats *indstats = &(pvs->indstats[i]);
     445              : 
     446           82 :         if (indstats->istat_updated)
     447              :         {
     448           49 :             istats[i] = palloc0_object(IndexBulkDeleteResult);
     449           49 :             memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     450              :         }
     451              :         else
     452           33 :             istats[i] = NULL;
     453              :     }
     454              : 
     455           23 :     TidStoreDestroy(pvs->dead_items);
     456              : 
     457           23 :     DestroyParallelContext(pvs->pcxt);
     458           23 :     ExitParallelMode();
     459              : 
     460           23 :     pfree(pvs->will_parallel_vacuum);
     461           23 :     pfree(pvs);
     462           23 : }
     463              : 
     464              : /*
     465              :  * Returns the dead items space and dead items information.
     466              :  */
     467              : TidStore *
     468           38 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
     469              : {
     470           38 :     *dead_items_info_p = &(pvs->shared->dead_items_info);
     471           38 :     return pvs->dead_items;
     472              : }
     473              : 
     474              : /* Forget all items in dead_items */
     475              : void
     476           15 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
     477              : {
     478           15 :     VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
     479              : 
     480              :     /*
     481              :      * Free the current tidstore and return allocated DSA segments to the
     482              :      * operating system. Then we recreate the tidstore with the same max_bytes
     483              :      * limitation we just used.
     484              :      */
     485           15 :     TidStoreDestroy(pvs->dead_items);
     486           15 :     pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
     487              :                                            LWTRANCHE_PARALLEL_VACUUM_DSA);
     488              : 
     489              :     /* Update the DSA pointer for dead_items to the new one */
     490           15 :     pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
     491           15 :     pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
     492              : 
     493              :     /* Reset the counter */
     494           15 :     dead_items_info->num_items = 0;
     495           15 : }
     496              : 
     497              : /*
     498              :  * Do parallel index bulk-deletion with parallel workers.
     499              :  */
     500              : void
     501           15 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     502              :                                     int num_index_scans, PVWorkerStats *wstats)
     503              : {
     504              :     Assert(!IsParallelWorker());
     505              : 
     506              :     /*
     507              :      * We can only provide an approximate value of num_heap_tuples, at least
     508              :      * for now.
     509              :      */
     510           15 :     pvs->shared->reltuples = num_table_tuples;
     511           15 :     pvs->shared->estimated_count = true;
     512              : 
     513           15 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
     514           15 : }
     515              : 
     516              : /*
     517              :  * Do parallel index cleanup with parallel workers.
     518              :  */
     519              : void
     520           23 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     521              :                                     int num_index_scans, bool estimated_count,
     522              :                                     PVWorkerStats *wstats)
     523              : {
     524              :     Assert(!IsParallelWorker());
     525              : 
     526              :     /*
     527              :      * We can provide a better estimate of total number of surviving tuples
     528              :      * (we assume indexes are more interested in that than in the number of
     529              :      * nominally live tuples).
     530              :      */
     531           23 :     pvs->shared->reltuples = num_table_tuples;
     532           23 :     pvs->shared->estimated_count = estimated_count;
     533              : 
     534           23 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
     535           23 : }
     536              : 
     537              : /*
     538              :  * Compute the number of parallel worker processes to request.  Both index
     539              :  * vacuum and index cleanup can be executed with parallel workers.
     540              :  * The index is eligible for parallel vacuum iff its size is greater than
     541              :  * min_parallel_index_scan_size as invoking workers for very small indexes
     542              :  * can hurt performance.
     543              :  *
     544              :  * nrequested is the number of parallel workers that user requested.  If
     545              :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     546              :  * the number of indexes that support parallel vacuum.  This function also
     547              :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     548              :  * vacuum.
     549              :  */
     550              : static int
     551         6709 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     552              :                                 bool *will_parallel_vacuum)
     553              : {
     554         6709 :     int         nindexes_parallel = 0;
     555         6709 :     int         nindexes_parallel_bulkdel = 0;
     556         6709 :     int         nindexes_parallel_cleanup = 0;
     557              :     int         parallel_workers;
     558              : 
     559              :     /*
     560              :      * We don't allow performing parallel operation in standalone backend or
     561              :      * when parallelism is disabled.
     562              :      */
     563         6709 :     if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
     564         2970 :         return 0;
     565              : 
     566              :     /*
     567              :      * Compute the number of indexes that can participate in parallel vacuum.
     568              :      */
     569        12178 :     for (int i = 0; i < nindexes; i++)
     570              :     {
     571         8439 :         Relation    indrel = indrels[i];
     572         8439 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     573              : 
     574              :         /* Skip index that is not a suitable target for parallel index vacuum */
     575         8439 :         if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     576         8439 :             RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     577         8349 :             continue;
     578              : 
     579           90 :         will_parallel_vacuum[i] = true;
     580              : 
     581           90 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     582           82 :             nindexes_parallel_bulkdel++;
     583           90 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     584           74 :             ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     585           82 :             nindexes_parallel_cleanup++;
     586              :     }
     587              : 
     588         3739 :     nindexes_parallel = Max(nindexes_parallel_bulkdel,
     589              :                             nindexes_parallel_cleanup);
     590              : 
     591              :     /* The leader process takes one index */
     592         3739 :     nindexes_parallel--;
     593              : 
     594              :     /* No index supports parallel vacuum */
     595         3739 :     if (nindexes_parallel <= 0)
     596         3716 :         return 0;
     597              : 
     598              :     /* Compute the parallel degree */
     599           23 :     parallel_workers = (nrequested > 0) ?
     600           23 :         Min(nrequested, nindexes_parallel) : nindexes_parallel;
     601              : 
     602              :     /* Cap by max_parallel_maintenance_workers */
     603           23 :     parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
     604              : 
     605           23 :     return parallel_workers;
     606              : }
     607              : 
     608              : /*
     609              :  * Perform index vacuum or index cleanup with parallel workers.  This function
     610              :  * must be used by the parallel vacuum leader process.
     611              :  *
     612              :  * If wstats is not NULL, the parallel worker statistics are updated.
     613              :  */
     614              : static void
     615           38 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     616              :                                     bool vacuum, PVWorkerStats *wstats)
     617              : {
     618              :     int         nworkers;
     619              :     PVIndVacStatus new_status;
     620              : 
     621              :     Assert(!IsParallelWorker());
     622              : 
     623           38 :     if (vacuum)
     624              :     {
     625           15 :         new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     626              : 
     627              :         /* Determine the number of parallel workers to launch */
     628           15 :         nworkers = pvs->nindexes_parallel_bulkdel;
     629              :     }
     630              :     else
     631              :     {
     632           23 :         new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     633              : 
     634              :         /* Determine the number of parallel workers to launch */
     635           23 :         nworkers = pvs->nindexes_parallel_cleanup;
     636              : 
     637              :         /* Add conditionally parallel-aware indexes if in the first time call */
     638           23 :         if (num_index_scans == 0)
     639           16 :             nworkers += pvs->nindexes_parallel_condcleanup;
     640              :     }
     641              : 
     642              :     /* The leader process will participate */
     643           38 :     nworkers--;
     644              : 
     645              :     /*
     646              :      * It is possible that parallel context is initialized with fewer workers
     647              :      * than the number of indexes that need a separate worker in the current
     648              :      * phase, so we need to consider it.  See
     649              :      * parallel_vacuum_compute_workers().
     650              :      */
     651           38 :     nworkers = Min(nworkers, pvs->pcxt->nworkers);
     652              : 
     653              :     /* Update the statistics, if we asked to */
     654           38 :     if (wstats != NULL && nworkers > 0)
     655           31 :         wstats->nplanned += nworkers;
     656              : 
     657              :     /*
     658              :      * Set index vacuum status and mark whether parallel vacuum worker can
     659              :      * process it.
     660              :      */
     661          153 :     for (int i = 0; i < pvs->nindexes; i++)
     662              :     {
     663          115 :         PVIndStats *indstats = &(pvs->indstats[i]);
     664              : 
     665              :         Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     666          115 :         indstats->status = new_status;
     667          115 :         indstats->parallel_workers_can_process =
     668          223 :             (pvs->will_parallel_vacuum[i] &&
     669          108 :              parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     670              :                                                     num_index_scans,
     671          115 :                                                     vacuum));
     672              :     }
     673              : 
     674              :     /* Reset the parallel index processing and progress counters */
     675           38 :     pg_atomic_write_u32(&(pvs->shared->idx), 0);
     676              : 
     677              :     /* Setup the shared cost-based vacuum delay and launch workers */
     678           38 :     if (nworkers > 0)
     679              :     {
     680              :         /* Reinitialize parallel context to relaunch parallel workers */
     681           31 :         if (num_index_scans > 0)
     682            8 :             ReinitializeParallelDSM(pvs->pcxt);
     683              : 
     684              :         /*
     685              :          * Set up shared cost balance and the number of active workers for
     686              :          * vacuum delay.  We need to do this before launching workers as
     687              :          * otherwise, they might not see the updated values for these
     688              :          * parameters.
     689              :          */
     690           31 :         pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     691           31 :         pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     692              : 
     693              :         /*
     694              :          * The number of workers can vary between bulkdelete and cleanup
     695              :          * phase.
     696              :          */
     697           31 :         ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     698              : 
     699           31 :         LaunchParallelWorkers(pvs->pcxt);
     700              : 
     701           31 :         if (pvs->pcxt->nworkers_launched > 0)
     702              :         {
     703              :             /*
     704              :              * Reset the local cost values for leader backend as we have
     705              :              * already accumulated the remaining balance of heap.
     706              :              */
     707           31 :             VacuumCostBalance = 0;
     708           31 :             VacuumCostBalanceLocal = 0;
     709              : 
     710              :             /* Enable shared cost balance for leader backend */
     711           31 :             VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     712           31 :             VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     713              : 
     714              :             /* Update the statistics, if we asked to */
     715           31 :             if (wstats != NULL)
     716           31 :                 wstats->nlaunched += pvs->pcxt->nworkers_launched;
     717              :         }
     718              : 
     719           31 :         if (vacuum)
     720           15 :             ereport(pvs->shared->elevel,
     721              :                     (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     722              :                                      "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     723              :                                      pvs->pcxt->nworkers_launched),
     724              :                             pvs->pcxt->nworkers_launched, nworkers)));
     725              :         else
     726           16 :             ereport(pvs->shared->elevel,
     727              :                     (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     728              :                                      "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     729              :                                      pvs->pcxt->nworkers_launched),
     730              :                             pvs->pcxt->nworkers_launched, nworkers)));
     731              :     }
     732              : 
     733              :     /* Vacuum the indexes that can be processed by only leader process */
     734           38 :     parallel_vacuum_process_unsafe_indexes(pvs);
     735              : 
     736              :     /*
     737              :      * Join as a parallel worker.  The leader vacuums alone processes all
     738              :      * parallel-safe indexes in the case where no workers are launched.
     739              :      */
     740           38 :     parallel_vacuum_process_safe_indexes(pvs);
     741              : 
     742              :     /*
     743              :      * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     744              :      * to finish, or we might get incomplete data.)
     745              :      */
     746           38 :     if (nworkers > 0)
     747              :     {
     748              :         /* Wait for all vacuum workers to finish */
     749           31 :         WaitForParallelWorkersToFinish(pvs->pcxt);
     750              : 
     751           70 :         for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     752           39 :             InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     753              :     }
     754              : 
     755              :     /*
     756              :      * Reset all index status back to initial (while checking that we have
     757              :      * vacuumed all indexes).
     758              :      */
     759          153 :     for (int i = 0; i < pvs->nindexes; i++)
     760              :     {
     761          115 :         PVIndStats *indstats = &(pvs->indstats[i]);
     762              : 
     763          115 :         if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     764            0 :             elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     765              :                  RelationGetRelationName(pvs->indrels[i]));
     766              : 
     767          115 :         indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     768              :     }
     769              : 
     770              :     /*
     771              :      * Carry the shared balance value to heap scan and disable shared costing
     772              :      */
     773           38 :     if (VacuumSharedCostBalance)
     774              :     {
     775           31 :         VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     776           31 :         VacuumSharedCostBalance = NULL;
     777           31 :         VacuumActiveNWorkers = NULL;
     778              :     }
     779           38 : }
     780              : 
     781              : /*
     782              :  * Index vacuum/cleanup routine used by the leader process and parallel
     783              :  * vacuum worker processes to vacuum the indexes in parallel.
     784              :  */
     785              : static void
     786           77 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     787              : {
     788              :     /*
     789              :      * Increment the active worker count if we are able to launch any worker.
     790              :      */
     791           77 :     if (VacuumActiveNWorkers)
     792           70 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     793              : 
     794              :     /* Loop until all indexes are vacuumed */
     795              :     for (;;)
     796          115 :     {
     797              :         int         idx;
     798              :         PVIndStats *indstats;
     799              : 
     800              :         /* Get an index number to process */
     801          192 :         idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     802              : 
     803              :         /* Done for all indexes? */
     804          192 :         if (idx >= pvs->nindexes)
     805           77 :             break;
     806              : 
     807          115 :         indstats = &(pvs->indstats[idx]);
     808              : 
     809              :         /*
     810              :          * Skip vacuuming index that is unsafe for workers or has an
     811              :          * unsuitable target for parallel index vacuum (this is vacuumed in
     812              :          * parallel_vacuum_process_unsafe_indexes() by the leader).
     813              :          */
     814          115 :         if (!indstats->parallel_workers_can_process)
     815           29 :             continue;
     816              : 
     817              :         /* Do vacuum or cleanup of the index */
     818           86 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
     819              :     }
     820              : 
     821              :     /*
     822              :      * We have completed the index vacuum so decrement the active worker
     823              :      * count.
     824              :      */
     825           77 :     if (VacuumActiveNWorkers)
     826           70 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     827           77 : }
     828              : 
     829              : /*
     830              :  * Perform parallel vacuuming of indexes in leader process.
     831              :  *
     832              :  * Handles index vacuuming (or index cleanup) for indexes that are not
     833              :  * parallel safe.  It's possible that this will vary for a given index, based
     834              :  * on details like whether we're performing index cleanup right now.
     835              :  *
     836              :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
     837              :  * enforced by parallel_vacuum_compute_workers().
     838              :  */
     839              : static void
     840           38 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
     841              : {
     842              :     Assert(!IsParallelWorker());
     843              : 
     844              :     /*
     845              :      * Increment the active worker count if we are able to launch any worker.
     846              :      */
     847           38 :     if (VacuumActiveNWorkers)
     848           31 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     849              : 
     850          153 :     for (int i = 0; i < pvs->nindexes; i++)
     851              :     {
     852          115 :         PVIndStats *indstats = &(pvs->indstats[i]);
     853              : 
     854              :         /* Skip, indexes that are safe for workers */
     855          115 :         if (indstats->parallel_workers_can_process)
     856           86 :             continue;
     857              : 
     858              :         /* Do vacuum or cleanup of the index */
     859           29 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
     860              :     }
     861              : 
     862              :     /*
     863              :      * We have completed the index vacuum so decrement the active worker
     864              :      * count.
     865              :      */
     866           38 :     if (VacuumActiveNWorkers)
     867           31 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     868           38 : }
     869              : 
     870              : /*
     871              :  * Vacuum or cleanup index either by leader process or by one of the worker
     872              :  * process.  After vacuuming the index this function copies the index
     873              :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
     874              :  * segment.
     875              :  */
     876              : static void
     877          115 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     878              :                                   PVIndStats *indstats)
     879              : {
     880          115 :     IndexBulkDeleteResult *istat = NULL;
     881              :     IndexBulkDeleteResult *istat_res;
     882              :     IndexVacuumInfo ivinfo;
     883              : 
     884              :     /*
     885              :      * Update the pointer to the corresponding bulk-deletion result if someone
     886              :      * has already updated it
     887              :      */
     888          115 :     if (indstats->istat_updated)
     889           33 :         istat = &(indstats->istat);
     890              : 
     891          115 :     ivinfo.index = indrel;
     892          115 :     ivinfo.heaprel = pvs->heaprel;
     893          115 :     ivinfo.analyze_only = false;
     894          115 :     ivinfo.report_progress = false;
     895          115 :     ivinfo.message_level = DEBUG2;
     896          115 :     ivinfo.estimated_count = pvs->shared->estimated_count;
     897          115 :     ivinfo.num_heap_tuples = pvs->shared->reltuples;
     898          115 :     ivinfo.strategy = pvs->bstrategy;
     899              : 
     900              :     /* Update error traceback information */
     901          115 :     pvs->indname = pstrdup(RelationGetRelationName(indrel));
     902          115 :     pvs->status = indstats->status;
     903              : 
     904          115 :     switch (indstats->status)
     905              :     {
     906           33 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
     907           33 :             istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
     908           33 :                                               &pvs->shared->dead_items_info);
     909           33 :             break;
     910           82 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
     911           82 :             istat_res = vac_cleanup_one_index(&ivinfo, istat);
     912           82 :             break;
     913            0 :         default:
     914            0 :             elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
     915              :                  indstats->status,
     916              :                  RelationGetRelationName(indrel));
     917              :     }
     918              : 
     919              :     /*
     920              :      * Copy the index bulk-deletion result returned from ambulkdelete and
     921              :      * amvacuumcleanup to the DSM segment if it's the first cycle because they
     922              :      * allocate locally and it's possible that an index will be vacuumed by a
     923              :      * different vacuum process the next cycle.  Copying the result normally
     924              :      * happens only the first time an index is vacuumed.  For any additional
     925              :      * vacuum pass, we directly point to the result on the DSM segment and
     926              :      * pass it to vacuum index APIs so that workers can update it directly.
     927              :      *
     928              :      * Since all vacuum workers write the bulk-deletion result at different
     929              :      * slots we can write them without locking.
     930              :      */
     931          115 :     if (!indstats->istat_updated && istat_res != NULL)
     932              :     {
     933           49 :         memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
     934           49 :         indstats->istat_updated = true;
     935              : 
     936              :         /* Free the locally-allocated bulk-deletion result */
     937           49 :         pfree(istat_res);
     938              :     }
     939              : 
     940              :     /*
     941              :      * Update the status to completed. No need to lock here since each worker
     942              :      * touches different indexes.
     943              :      */
     944          115 :     indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     945              : 
     946              :     /* Reset error traceback information */
     947          115 :     pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     948          115 :     pfree(pvs->indname);
     949          115 :     pvs->indname = NULL;
     950              : 
     951              :     /*
     952              :      * Call the parallel variant of pgstat_progress_incr_param so workers can
     953              :      * report progress of index vacuum to the leader.
     954              :      */
     955          115 :     pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
     956          115 : }
     957              : 
     958              : /*
     959              :  * Returns false, if the given index can't participate in the next execution of
     960              :  * parallel index vacuum or parallel index cleanup.
     961              :  */
     962              : static bool
     963          108 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     964              :                                        bool vacuum)
     965              : {
     966              :     uint8       vacoptions;
     967              : 
     968          108 :     vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     969              : 
     970              :     /* In parallel vacuum case, check if it supports parallel bulk-deletion */
     971          108 :     if (vacuum)
     972           30 :         return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
     973              : 
     974              :     /* Not safe, if the index does not support parallel cleanup */
     975           78 :     if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
     976           62 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
     977            8 :         return false;
     978              : 
     979              :     /*
     980              :      * Not safe, if the index supports parallel cleanup conditionally, but we
     981              :      * have already processed the index (for bulkdelete).  We do this to avoid
     982              :      * the need to invoke workers when parallel index cleanup doesn't need to
     983              :      * scan the index.  See the comments for option
     984              :      * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
     985              :      * parallel cleanup conditionally.
     986              :      */
     987           70 :     if (num_index_scans > 0 &&
     988           14 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     989           14 :         return false;
     990              : 
     991           56 :     return true;
     992              : }
     993              : 
     994              : /*
     995              :  * Perform work within a launched parallel process.
     996              :  *
     997              :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
     998              :  * we don't need to report progress information.
     999              :  */
    1000              : void
    1001           39 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
    1002              : {
    1003              :     ParallelVacuumState pvs;
    1004              :     Relation    rel;
    1005              :     Relation   *indrels;
    1006              :     PVIndStats *indstats;
    1007              :     PVShared   *shared;
    1008              :     TidStore   *dead_items;
    1009              :     BufferUsage *buffer_usage;
    1010              :     WalUsage   *wal_usage;
    1011              :     int         nindexes;
    1012              :     char       *sharedquery;
    1013              :     ErrorContextCallback errcallback;
    1014              : 
    1015              :     /*
    1016              :      * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
    1017              :      * don't support parallel vacuum for autovacuum as of now.
    1018              :      */
    1019              :     Assert(MyProc->statusFlags == PROC_IN_VACUUM);
    1020              : 
    1021           39 :     elog(DEBUG1, "starting parallel vacuum worker");
    1022              : 
    1023           39 :     shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
    1024              : 
    1025              :     /* Set debug_query_string for individual workers */
    1026           39 :     sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
    1027           39 :     debug_query_string = sharedquery;
    1028           39 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1029              : 
    1030              :     /* Track query ID */
    1031           39 :     pgstat_report_query_id(shared->queryid, false);
    1032              : 
    1033              :     /*
    1034              :      * Open table.  The lock mode is the same as the leader process.  It's
    1035              :      * okay because the lock mode does not conflict among the parallel
    1036              :      * workers.
    1037              :      */
    1038           39 :     rel = table_open(shared->relid, ShareUpdateExclusiveLock);
    1039              : 
    1040              :     /*
    1041              :      * Open all indexes. indrels are sorted in order by OID, which should be
    1042              :      * matched to the leader's one.
    1043              :      */
    1044           39 :     vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
    1045              :     Assert(nindexes > 0);
    1046              : 
    1047              :     /*
    1048              :      * Apply the desired value of maintenance_work_mem within this process.
    1049              :      * Really we should use SetConfigOption() to change a GUC, but since we're
    1050              :      * already in parallel mode guc.c would complain about that.  Fortunately,
    1051              :      * by the same token guc.c will not let any user-defined code change it.
    1052              :      * So just avert your eyes while we do this:
    1053              :      */
    1054           39 :     if (shared->maintenance_work_mem_worker > 0)
    1055           39 :         maintenance_work_mem = shared->maintenance_work_mem_worker;
    1056              : 
    1057              :     /* Set index statistics */
    1058           39 :     indstats = (PVIndStats *) shm_toc_lookup(toc,
    1059              :                                              PARALLEL_VACUUM_KEY_INDEX_STATS,
    1060              :                                              false);
    1061              : 
    1062              :     /* Find dead_items in shared memory */
    1063           39 :     dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
    1064              :                                 shared->dead_items_handle);
    1065              : 
    1066              :     /* Set cost-based vacuum delay */
    1067           39 :     VacuumUpdateCosts();
    1068           39 :     VacuumCostBalance = 0;
    1069           39 :     VacuumCostBalanceLocal = 0;
    1070           39 :     VacuumSharedCostBalance = &(shared->cost_balance);
    1071           39 :     VacuumActiveNWorkers = &(shared->active_nworkers);
    1072              : 
    1073              :     /* Set parallel vacuum state */
    1074           39 :     pvs.indrels = indrels;
    1075           39 :     pvs.nindexes = nindexes;
    1076           39 :     pvs.indstats = indstats;
    1077           39 :     pvs.shared = shared;
    1078           39 :     pvs.dead_items = dead_items;
    1079           39 :     pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1080           39 :     pvs.relname = pstrdup(RelationGetRelationName(rel));
    1081           39 :     pvs.heaprel = rel;
    1082              : 
    1083              :     /* These fields will be filled during index vacuum or cleanup */
    1084           39 :     pvs.indname = NULL;
    1085           39 :     pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1086              : 
    1087              :     /* Each parallel VACUUM worker gets its own access strategy. */
    1088           78 :     pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1089           39 :                                               shared->ring_nbuffers * (BLCKSZ / 1024));
    1090              : 
    1091              :     /* Setup error traceback support for ereport() */
    1092           39 :     errcallback.callback = parallel_vacuum_error_callback;
    1093           39 :     errcallback.arg = &pvs;
    1094           39 :     errcallback.previous = error_context_stack;
    1095           39 :     error_context_stack = &errcallback;
    1096              : 
    1097              :     /* Prepare to track buffer usage during parallel execution */
    1098           39 :     InstrStartParallelQuery();
    1099              : 
    1100              :     /* Process indexes to perform vacuum/cleanup */
    1101           39 :     parallel_vacuum_process_safe_indexes(&pvs);
    1102              : 
    1103              :     /* Report buffer/WAL usage during parallel execution */
    1104           39 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1105           39 :     wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1106           39 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1107           39 :                           &wal_usage[ParallelWorkerNumber]);
    1108              : 
    1109              :     /* Report any remaining cost-based vacuum delay time */
    1110           39 :     if (track_cost_delay_timing)
    1111            0 :         pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
    1112              :                                             parallel_vacuum_worker_delay_ns);
    1113              : 
    1114           39 :     TidStoreDetach(dead_items);
    1115              : 
    1116              :     /* Pop the error context stack */
    1117           39 :     error_context_stack = errcallback.previous;
    1118              : 
    1119           39 :     vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1120           39 :     table_close(rel, ShareUpdateExclusiveLock);
    1121           39 :     FreeAccessStrategy(pvs.bstrategy);
    1122           39 : }
    1123              : 
    1124              : /*
    1125              :  * Error context callback for errors occurring during parallel index vacuum.
    1126              :  * The error context messages should match the messages set in the lazy vacuum
    1127              :  * error context.  If you change this function, change vacuum_error_callback()
    1128              :  * as well.
    1129              :  */
    1130              : static void
    1131            0 : parallel_vacuum_error_callback(void *arg)
    1132              : {
    1133            0 :     ParallelVacuumState *errinfo = arg;
    1134              : 
    1135            0 :     switch (errinfo->status)
    1136              :     {
    1137            0 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1138            0 :             errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1139              :                        errinfo->indname,
    1140              :                        errinfo->relnamespace,
    1141              :                        errinfo->relname);
    1142            0 :             break;
    1143            0 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1144            0 :             errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1145              :                        errinfo->indname,
    1146              :                        errinfo->relnamespace,
    1147              :                        errinfo->relname);
    1148            0 :             break;
    1149            0 :         case PARALLEL_INDVAC_STATUS_INITIAL:
    1150              :         case PARALLEL_INDVAC_STATUS_COMPLETED:
    1151              :         default:
    1152            0 :             return;
    1153              :     }
    1154              : }
        

Generated by: LCOV version 2.0-1