LCOV - code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 303 318 95.3 %
Date: 2024-11-21 08:14:44 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * vacuumparallel.c
       4             :  *    Support routines for parallel vacuum execution.
       5             :  *
       6             :  * This file contains routines that are intended to support setting up, using,
       7             :  * and tearing down a ParallelVacuumState.
       8             :  *
       9             :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      10             :  * with parallel worker processes.  Individual indexes are processed by one
      11             :  * vacuum process.  ParallelVacuumState contains shared information as well as
      12             :  * the memory space for storing dead items allocated in the DSA area.  We
      13             :  * launch parallel worker processes at the start of parallel index
      14             :  * bulk-deletion and index cleanup and once all indexes are processed, the
      15             :  * parallel worker processes exit.  Each time we process indexes in parallel,
      16             :  * the parallel context is re-initialized so that the same DSM can be used for
      17             :  * multiple passes of index bulk-deletion and index cleanup.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/commands/vacuumparallel.c
      24             :  *
      25             :  *-------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/amapi.h"
      30             : #include "access/table.h"
      31             : #include "access/xact.h"
      32             : #include "commands/progress.h"
      33             : #include "commands/vacuum.h"
      34             : #include "executor/instrument.h"
      35             : #include "optimizer/paths.h"
      36             : #include "pgstat.h"
      37             : #include "storage/bufmgr.h"
      38             : #include "tcop/tcopprot.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : 
      42             : /*
      43             :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      44             :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      45             :  * use small integers.
      46             :  */
      47             : #define PARALLEL_VACUUM_KEY_SHARED          1
      48             : #define PARALLEL_VACUUM_KEY_QUERY_TEXT      2
      49             : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE    3
      50             : #define PARALLEL_VACUUM_KEY_WAL_USAGE       4
      51             : #define PARALLEL_VACUUM_KEY_INDEX_STATS     5
      52             : 
      53             : /*
      54             :  * Shared information among parallel workers.  So this is allocated in the DSM
      55             :  * segment.
      56             :  */
      57             : typedef struct PVShared
      58             : {
      59             :     /*
      60             :      * Target table relid, log level (for messages about parallel workers
      61             :      * launched during VACUUM VERBOSE) and query ID.  These fields are not
      62             :      * modified during the parallel vacuum.
      63             :      */
      64             :     Oid         relid;
      65             :     int         elevel;
      66             :     uint64      queryid;
      67             : 
      68             :     /*
      69             :      * Fields for both index vacuum and cleanup.
      70             :      *
      71             :      * reltuples is the total number of input heap tuples.  We set either old
      72             :      * live tuples in the index vacuum case or the new live tuples in the
      73             :      * index cleanup case.
      74             :      *
      75             :      * estimated_count is true if reltuples is an estimated value.  (Note that
      76             :      * reltuples could be -1 in this case, indicating we have no idea.)
      77             :      */
      78             :     double      reltuples;
      79             :     bool        estimated_count;
      80             : 
      81             :     /*
      82             :      * In single process vacuum we could consume more memory during index
      83             :      * vacuuming or cleanup apart from the memory for heap scanning.  In
      84             :      * parallel vacuum, since individual vacuum workers can consume memory
      85             :      * equal to maintenance_work_mem, the new maintenance_work_mem for each
      86             :      * worker is set such that the parallel operation doesn't consume more
      87             :      * memory than single process vacuum.
      88             :      */
      89             :     int         maintenance_work_mem_worker;
      90             : 
      91             :     /*
      92             :      * The number of buffers each worker's Buffer Access Strategy ring should
      93             :      * contain.
      94             :      */
      95             :     int         ring_nbuffers;
      96             : 
      97             :     /*
      98             :      * Shared vacuum cost balance.  During parallel vacuum,
      99             :      * VacuumSharedCostBalance points to this value and it accumulates the
     100             :      * balance of each parallel vacuum worker.
     101             :      */
     102             :     pg_atomic_uint32 cost_balance;
     103             : 
     104             :     /*
     105             :      * Number of active parallel workers.  This is used for computing the
     106             :      * minimum threshold of the vacuum cost balance before a worker sleeps for
     107             :      * cost-based delay.
     108             :      */
     109             :     pg_atomic_uint32 active_nworkers;
     110             : 
     111             :     /* Counter for vacuuming and cleanup */
     112             :     pg_atomic_uint32 idx;
     113             : 
     114             :     /* DSA handle where the TidStore lives */
     115             :     dsa_handle  dead_items_dsa_handle;
     116             : 
     117             :     /* DSA pointer to the shared TidStore */
     118             :     dsa_pointer dead_items_handle;
     119             : 
     120             :     /* Statistics of shared dead items */
     121             :     VacDeadItemsInfo dead_items_info;
     122             : } PVShared;
     123             : 
     124             : /* Status used during parallel index vacuum or cleanup */
     125             : typedef enum PVIndVacStatus
     126             : {
     127             :     PARALLEL_INDVAC_STATUS_INITIAL = 0,
     128             :     PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     129             :     PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     130             :     PARALLEL_INDVAC_STATUS_COMPLETED,
     131             : } PVIndVacStatus;
     132             : 
     133             : /*
     134             :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     135             :  * This includes the status of parallel index vacuum as well as index statistics.
     136             :  */
     137             : typedef struct PVIndStats
     138             : {
     139             :     /*
     140             :      * The following two fields are set by leader process before executing
     141             :      * parallel index vacuum or parallel index cleanup.  These fields are not
     142             :      * fixed for the entire VACUUM operation.  They are only fixed for an
     143             :      * individual parallel index vacuum and cleanup.
     144             :      *
     145             :      * parallel_workers_can_process is true if both leader and worker can
     146             :      * process the index, otherwise only leader can process it.
     147             :      */
     148             :     PVIndVacStatus status;
     149             :     bool        parallel_workers_can_process;
     150             : 
     151             :     /*
     152             :      * Individual worker or leader stores the result of index vacuum or
     153             :      * cleanup.
     154             :      */
     155             :     bool        istat_updated;  /* are the stats updated? */
     156             :     IndexBulkDeleteResult istat;
     157             : } PVIndStats;
     158             : 
     159             : /*
     160             :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     161             :  */
     162             : struct ParallelVacuumState
     163             : {
     164             :     /* NULL for worker processes */
     165             :     ParallelContext *pcxt;
     166             : 
     167             :     /* Parent Heap Relation */
     168             :     Relation    heaprel;
     169             : 
     170             :     /* Target indexes */
     171             :     Relation   *indrels;
     172             :     int         nindexes;
     173             : 
     174             :     /* Shared information among parallel vacuum workers */
     175             :     PVShared   *shared;
     176             : 
     177             :     /*
     178             :      * Shared index statistics among parallel vacuum workers. The array
     179             :      * element is allocated for every index, even those indexes where parallel
     180             :      * index vacuuming is unsafe or not worthwhile (e.g.,
     181             :      * will_parallel_vacuum[] is false).  During parallel vacuum,
     182             :      * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     183             :      * local memory at the end of parallel vacuum.
     184             :      */
     185             :     PVIndStats *indstats;
     186             : 
     187             :     /* Shared dead items space among parallel vacuum workers */
     188             :     TidStore   *dead_items;
     189             : 
     190             :     /* Points to buffer usage area in DSM */
     191             :     BufferUsage *buffer_usage;
     192             : 
     193             :     /* Points to WAL usage area in DSM */
     194             :     WalUsage   *wal_usage;
     195             : 
     196             :     /*
     197             :      * False if the index is totally unsuitable target for all parallel
     198             :      * processing. For example, the index could be <
     199             :      * min_parallel_index_scan_size cutoff.
     200             :      */
     201             :     bool       *will_parallel_vacuum;
     202             : 
     203             :     /*
     204             :      * The number of indexes that support parallel index bulk-deletion and
     205             :      * parallel index cleanup respectively.
     206             :      */
     207             :     int         nindexes_parallel_bulkdel;
     208             :     int         nindexes_parallel_cleanup;
     209             :     int         nindexes_parallel_condcleanup;
     210             : 
     211             :     /* Buffer access strategy used by leader process */
     212             :     BufferAccessStrategy bstrategy;
     213             : 
     214             :     /*
     215             :      * Error reporting state.  The error callback is set only for workers
     216             :      * processes during parallel index vacuum.
     217             :      */
     218             :     char       *relnamespace;
     219             :     char       *relname;
     220             :     char       *indname;
     221             :     PVIndVacStatus status;
     222             : };
     223             : 
     224             : static int  parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     225             :                                             bool *will_parallel_vacuum);
     226             : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     227             :                                                 bool vacuum);
     228             : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     229             : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     230             : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     231             :                                               PVIndStats *indstats);
     232             : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     233             :                                                    bool vacuum);
     234             : static void parallel_vacuum_error_callback(void *arg);
     235             : 
     236             : /*
     237             :  * Try to enter parallel mode and create a parallel context.  Then initialize
     238             :  * shared memory state.
     239             :  *
     240             :  * On success, return parallel vacuum state.  Otherwise return NULL.
     241             :  */
     242             : ParallelVacuumState *
     243        7934 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     244             :                      int nrequested_workers, int vac_work_mem,
     245             :                      int elevel, BufferAccessStrategy bstrategy)
     246             : {
     247             :     ParallelVacuumState *pvs;
     248             :     ParallelContext *pcxt;
     249             :     PVShared   *shared;
     250             :     TidStore   *dead_items;
     251             :     PVIndStats *indstats;
     252             :     BufferUsage *buffer_usage;
     253             :     WalUsage   *wal_usage;
     254             :     bool       *will_parallel_vacuum;
     255             :     Size        est_indstats_len;
     256             :     Size        est_shared_len;
     257        7934 :     int         nindexes_mwm = 0;
     258        7934 :     int         parallel_workers = 0;
     259             :     int         querylen;
     260             : 
     261             :     /*
     262             :      * A parallel vacuum must be requested and there must be indexes on the
     263             :      * relation
     264             :      */
     265             :     Assert(nrequested_workers >= 0);
     266             :     Assert(nindexes > 0);
     267             : 
     268             :     /*
     269             :      * Compute the number of parallel vacuum workers to launch
     270             :      */
     271        7934 :     will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
     272        7934 :     parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     273             :                                                        nrequested_workers,
     274             :                                                        will_parallel_vacuum);
     275        7934 :     if (parallel_workers <= 0)
     276             :     {
     277             :         /* Can't perform vacuum in parallel -- return NULL */
     278        7912 :         pfree(will_parallel_vacuum);
     279        7912 :         return NULL;
     280             :     }
     281             : 
     282          22 :     pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
     283          22 :     pvs->indrels = indrels;
     284          22 :     pvs->nindexes = nindexes;
     285          22 :     pvs->will_parallel_vacuum = will_parallel_vacuum;
     286          22 :     pvs->bstrategy = bstrategy;
     287          22 :     pvs->heaprel = rel;
     288             : 
     289          22 :     EnterParallelMode();
     290          22 :     pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     291             :                                  parallel_workers);
     292             :     Assert(pcxt->nworkers > 0);
     293          22 :     pvs->pcxt = pcxt;
     294             : 
     295             :     /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     296          22 :     est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     297          22 :     shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     298          22 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     299             : 
     300             :     /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     301          22 :     est_shared_len = sizeof(PVShared);
     302          22 :     shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     303          22 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     304             : 
     305             :     /*
     306             :      * Estimate space for BufferUsage and WalUsage --
     307             :      * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     308             :      *
     309             :      * If there are no extensions loaded that care, we could skip this.  We
     310             :      * have no way of knowing whether anyone's looking at pgBufferUsage or
     311             :      * pgWalUsage, so do it unconditionally.
     312             :      */
     313          22 :     shm_toc_estimate_chunk(&pcxt->estimator,
     314             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     315          22 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     316          22 :     shm_toc_estimate_chunk(&pcxt->estimator,
     317             :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     318          22 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     319             : 
     320             :     /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     321          22 :     if (debug_query_string)
     322             :     {
     323          22 :         querylen = strlen(debug_query_string);
     324          22 :         shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     325          22 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     326             :     }
     327             :     else
     328           0 :         querylen = 0;           /* keep compiler quiet */
     329             : 
     330          22 :     InitializeParallelDSM(pcxt);
     331             : 
     332             :     /* Prepare index vacuum stats */
     333          22 :     indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     334         610 :     MemSet(indstats, 0, est_indstats_len);
     335         120 :     for (int i = 0; i < nindexes; i++)
     336             :     {
     337          98 :         Relation    indrel = indrels[i];
     338          98 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     339             : 
     340             :         /*
     341             :          * Cleanup option should be either disabled, always performing in
     342             :          * parallel or conditionally performing in parallel.
     343             :          */
     344             :         Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     345             :                ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     346             :         Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     347             : 
     348          98 :         if (!will_parallel_vacuum[i])
     349           6 :             continue;
     350             : 
     351          92 :         if (indrel->rd_indam->amusemaintenanceworkmem)
     352          12 :             nindexes_mwm++;
     353             : 
     354             :         /*
     355             :          * Remember the number of indexes that support parallel operation for
     356             :          * each phase.
     357             :          */
     358          92 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     359          80 :             pvs->nindexes_parallel_bulkdel++;
     360          92 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     361          24 :             pvs->nindexes_parallel_cleanup++;
     362          92 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     363          56 :             pvs->nindexes_parallel_condcleanup++;
     364             :     }
     365          22 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     366          22 :     pvs->indstats = indstats;
     367             : 
     368             :     /* Prepare shared information */
     369          22 :     shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     370         242 :     MemSet(shared, 0, est_shared_len);
     371          22 :     shared->relid = RelationGetRelid(rel);
     372          22 :     shared->elevel = elevel;
     373          22 :     shared->queryid = pgstat_get_my_query_id();
     374          22 :     shared->maintenance_work_mem_worker =
     375             :         (nindexes_mwm > 0) ?
     376          22 :         maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
     377             :         maintenance_work_mem;
     378          22 :     shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
     379             : 
     380             :     /* Prepare DSA space for dead items */
     381          22 :     dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
     382             :                                       LWTRANCHE_PARALLEL_VACUUM_DSA);
     383          22 :     pvs->dead_items = dead_items;
     384          22 :     shared->dead_items_handle = TidStoreGetHandle(dead_items);
     385          22 :     shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
     386             : 
     387             :     /* Use the same buffer size for all workers */
     388          22 :     shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     389             : 
     390          22 :     pg_atomic_init_u32(&(shared->cost_balance), 0);
     391          22 :     pg_atomic_init_u32(&(shared->active_nworkers), 0);
     392          22 :     pg_atomic_init_u32(&(shared->idx), 0);
     393             : 
     394          22 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     395          22 :     pvs->shared = shared;
     396             : 
     397             :     /*
     398             :      * Allocate space for each worker's BufferUsage and WalUsage; no need to
     399             :      * initialize
     400             :      */
     401          22 :     buffer_usage = shm_toc_allocate(pcxt->toc,
     402          22 :                                     mul_size(sizeof(BufferUsage), pcxt->nworkers));
     403          22 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     404          22 :     pvs->buffer_usage = buffer_usage;
     405          22 :     wal_usage = shm_toc_allocate(pcxt->toc,
     406          22 :                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     407          22 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     408          22 :     pvs->wal_usage = wal_usage;
     409             : 
     410             :     /* Store query string for workers */
     411          22 :     if (debug_query_string)
     412             :     {
     413             :         char       *sharedquery;
     414             : 
     415          22 :         sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     416          22 :         memcpy(sharedquery, debug_query_string, querylen + 1);
     417          22 :         sharedquery[querylen] = '\0';
     418          22 :         shm_toc_insert(pcxt->toc,
     419             :                        PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     420             :     }
     421             : 
     422             :     /* Success -- return parallel vacuum state */
     423          22 :     return pvs;
     424             : }
     425             : 
     426             : /*
     427             :  * Destroy the parallel context, and end parallel mode.
     428             :  *
     429             :  * Since writes are not allowed during parallel mode, copy the
     430             :  * updated index statistics from DSM into local memory and then later use that
     431             :  * to update the index statistics.  One might think that we can exit from
     432             :  * parallel mode, update the index statistics and then destroy parallel
     433             :  * context, but that won't be safe (see ExitParallelMode).
     434             :  */
     435             : void
     436          22 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     437             : {
     438             :     Assert(!IsParallelWorker());
     439             : 
     440             :     /* Copy the updated statistics */
     441         120 :     for (int i = 0; i < pvs->nindexes; i++)
     442             :     {
     443          98 :         PVIndStats *indstats = &(pvs->indstats[i]);
     444             : 
     445          98 :         if (indstats->istat_updated)
     446             :         {
     447          82 :             istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     448          82 :             memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     449             :         }
     450             :         else
     451          16 :             istats[i] = NULL;
     452             :     }
     453             : 
     454          22 :     TidStoreDestroy(pvs->dead_items);
     455             : 
     456          22 :     DestroyParallelContext(pvs->pcxt);
     457          22 :     ExitParallelMode();
     458             : 
     459          22 :     pfree(pvs->will_parallel_vacuum);
     460          22 :     pfree(pvs);
     461          22 : }
     462             : 
     463             : /*
     464             :  * Returns the dead items space and dead items information.
     465             :  */
     466             : TidStore *
     467          22 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
     468             : {
     469          22 :     *dead_items_info_p = &(pvs->shared->dead_items_info);
     470          22 :     return pvs->dead_items;
     471             : }
     472             : 
     473             : /* Forget all items in dead_items */
     474             : void
     475          14 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
     476             : {
     477          14 :     TidStore   *dead_items = pvs->dead_items;
     478          14 :     VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
     479             : 
     480             :     /*
     481             :      * Free the current tidstore and return allocated DSA segments to the
     482             :      * operating system. Then we recreate the tidstore with the same max_bytes
     483             :      * limitation we just used.
     484             :      */
     485          14 :     TidStoreDestroy(dead_items);
     486          14 :     pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
     487             :                                            LWTRANCHE_PARALLEL_VACUUM_DSA);
     488             : 
     489             :     /* Update the DSA pointer for dead_items to the new one */
     490          14 :     pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
     491          14 :     pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
     492             : 
     493             :     /* Reset the counter */
     494          14 :     dead_items_info->num_items = 0;
     495          14 : }
     496             : 
     497             : /*
     498             :  * Do parallel index bulk-deletion with parallel workers.
     499             :  */
     500             : void
     501          14 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     502             :                                     int num_index_scans)
     503             : {
     504             :     Assert(!IsParallelWorker());
     505             : 
     506             :     /*
     507             :      * We can only provide an approximate value of num_heap_tuples, at least
     508             :      * for now.
     509             :      */
     510          14 :     pvs->shared->reltuples = num_table_tuples;
     511          14 :     pvs->shared->estimated_count = true;
     512             : 
     513          14 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
     514          14 : }
     515             : 
     516             : /*
     517             :  * Do parallel index cleanup with parallel workers.
     518             :  */
     519             : void
     520          22 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     521             :                                     int num_index_scans, bool estimated_count)
     522             : {
     523             :     Assert(!IsParallelWorker());
     524             : 
     525             :     /*
     526             :      * We can provide a better estimate of total number of surviving tuples
     527             :      * (we assume indexes are more interested in that than in the number of
     528             :      * nominally live tuples).
     529             :      */
     530          22 :     pvs->shared->reltuples = num_table_tuples;
     531          22 :     pvs->shared->estimated_count = estimated_count;
     532             : 
     533          22 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
     534          22 : }
     535             : 
     536             : /*
     537             :  * Compute the number of parallel worker processes to request.  Both index
     538             :  * vacuum and index cleanup can be executed with parallel workers.
     539             :  * The index is eligible for parallel vacuum iff its size is greater than
     540             :  * min_parallel_index_scan_size as invoking workers for very small indexes
     541             :  * can hurt performance.
     542             :  *
     543             :  * nrequested is the number of parallel workers that user requested.  If
     544             :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     545             :  * the number of indexes that support parallel vacuum.  This function also
     546             :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     547             :  * vacuum.
     548             :  */
     549             : static int
     550        7934 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     551             :                                 bool *will_parallel_vacuum)
     552             : {
     553        7934 :     int         nindexes_parallel = 0;
     554        7934 :     int         nindexes_parallel_bulkdel = 0;
     555        7934 :     int         nindexes_parallel_cleanup = 0;
     556             :     int         parallel_workers;
     557             : 
     558             :     /*
     559             :      * We don't allow performing parallel operation in standalone backend or
     560             :      * when parallelism is disabled.
     561             :      */
     562        7934 :     if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
     563        4214 :         return 0;
     564             : 
     565             :     /*
     566             :      * Compute the number of indexes that can participate in parallel vacuum.
     567             :      */
     568       12160 :     for (int i = 0; i < nindexes; i++)
     569             :     {
     570        8440 :         Relation    indrel = indrels[i];
     571        8440 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     572             : 
     573             :         /* Skip index that is not a suitable target for parallel index vacuum */
     574        8440 :         if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     575        8440 :             RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     576        8334 :             continue;
     577             : 
     578         106 :         will_parallel_vacuum[i] = true;
     579             : 
     580         106 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     581          94 :             nindexes_parallel_bulkdel++;
     582         106 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     583          82 :             ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     584          94 :             nindexes_parallel_cleanup++;
     585             :     }
     586             : 
     587        3720 :     nindexes_parallel = Max(nindexes_parallel_bulkdel,
     588             :                             nindexes_parallel_cleanup);
     589             : 
     590             :     /* The leader process takes one index */
     591        3720 :     nindexes_parallel--;
     592             : 
     593             :     /* No index supports parallel vacuum */
     594        3720 :     if (nindexes_parallel <= 0)
     595        3698 :         return 0;
     596             : 
     597             :     /* Compute the parallel degree */
     598          22 :     parallel_workers = (nrequested > 0) ?
     599          22 :         Min(nrequested, nindexes_parallel) : nindexes_parallel;
     600             : 
     601             :     /* Cap by max_parallel_maintenance_workers */
     602          22 :     parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
     603             : 
     604          22 :     return parallel_workers;
     605             : }
     606             : 
     607             : /*
     608             :  * Perform index vacuum or index cleanup with parallel workers.  This function
     609             :  * must be used by the parallel vacuum leader process.
     610             :  */
     611             : static void
     612          36 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     613             :                                     bool vacuum)
     614             : {
     615             :     int         nworkers;
     616             :     PVIndVacStatus new_status;
     617             : 
     618             :     Assert(!IsParallelWorker());
     619             : 
     620          36 :     if (vacuum)
     621             :     {
     622          14 :         new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     623             : 
     624             :         /* Determine the number of parallel workers to launch */
     625          14 :         nworkers = pvs->nindexes_parallel_bulkdel;
     626             :     }
     627             :     else
     628             :     {
     629          22 :         new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     630             : 
     631             :         /* Determine the number of parallel workers to launch */
     632          22 :         nworkers = pvs->nindexes_parallel_cleanup;
     633             : 
     634             :         /* Add conditionally parallel-aware indexes if in the first time call */
     635          22 :         if (num_index_scans == 0)
     636           8 :             nworkers += pvs->nindexes_parallel_condcleanup;
     637             :     }
     638             : 
     639             :     /* The leader process will participate */
     640          36 :     nworkers--;
     641             : 
     642             :     /*
     643             :      * It is possible that parallel context is initialized with fewer workers
     644             :      * than the number of indexes that need a separate worker in the current
     645             :      * phase, so we need to consider it.  See
     646             :      * parallel_vacuum_compute_workers().
     647             :      */
     648          36 :     nworkers = Min(nworkers, pvs->pcxt->nworkers);
     649             : 
     650             :     /*
     651             :      * Set index vacuum status and mark whether parallel vacuum worker can
     652             :      * process it.
     653             :      */
     654         184 :     for (int i = 0; i < pvs->nindexes; i++)
     655             :     {
     656         148 :         PVIndStats *indstats = &(pvs->indstats[i]);
     657             : 
     658             :         Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     659         148 :         indstats->status = new_status;
     660         148 :         indstats->parallel_workers_can_process =
     661         284 :             (pvs->will_parallel_vacuum[i] &&
     662         136 :              parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     663             :                                                     num_index_scans,
     664             :                                                     vacuum));
     665             :     }
     666             : 
     667             :     /* Reset the parallel index processing and progress counters */
     668          36 :     pg_atomic_write_u32(&(pvs->shared->idx), 0);
     669             : 
     670             :     /* Setup the shared cost-based vacuum delay and launch workers */
     671          36 :     if (nworkers > 0)
     672             :     {
     673             :         /* Reinitialize parallel context to relaunch parallel workers */
     674          26 :         if (num_index_scans > 0)
     675           4 :             ReinitializeParallelDSM(pvs->pcxt);
     676             : 
     677             :         /*
     678             :          * Set up shared cost balance and the number of active workers for
     679             :          * vacuum delay.  We need to do this before launching workers as
     680             :          * otherwise, they might not see the updated values for these
     681             :          * parameters.
     682             :          */
     683          26 :         pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     684          26 :         pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     685             : 
     686             :         /*
     687             :          * The number of workers can vary between bulkdelete and cleanup
     688             :          * phase.
     689             :          */
     690          26 :         ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     691             : 
     692          26 :         LaunchParallelWorkers(pvs->pcxt);
     693             : 
     694          26 :         if (pvs->pcxt->nworkers_launched > 0)
     695             :         {
     696             :             /*
     697             :              * Reset the local cost values for leader backend as we have
     698             :              * already accumulated the remaining balance of heap.
     699             :              */
     700          26 :             VacuumCostBalance = 0;
     701          26 :             VacuumCostBalanceLocal = 0;
     702             : 
     703             :             /* Enable shared cost balance for leader backend */
     704          26 :             VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     705          26 :             VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     706             :         }
     707             : 
     708          26 :         if (vacuum)
     709          14 :             ereport(pvs->shared->elevel,
     710             :                     (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     711             :                                      "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     712             :                                      pvs->pcxt->nworkers_launched),
     713             :                             pvs->pcxt->nworkers_launched, nworkers)));
     714             :         else
     715          12 :             ereport(pvs->shared->elevel,
     716             :                     (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     717             :                                      "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     718             :                                      pvs->pcxt->nworkers_launched),
     719             :                             pvs->pcxt->nworkers_launched, nworkers)));
     720             :     }
     721             : 
     722             :     /* Vacuum the indexes that can be processed by only leader process */
     723          36 :     parallel_vacuum_process_unsafe_indexes(pvs);
     724             : 
     725             :     /*
     726             :      * Join as a parallel worker.  The leader vacuums alone processes all
     727             :      * parallel-safe indexes in the case where no workers are launched.
     728             :      */
     729          36 :     parallel_vacuum_process_safe_indexes(pvs);
     730             : 
     731             :     /*
     732             :      * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     733             :      * to finish, or we might get incomplete data.)
     734             :      */
     735          36 :     if (nworkers > 0)
     736             :     {
     737             :         /* Wait for all vacuum workers to finish */
     738          26 :         WaitForParallelWorkersToFinish(pvs->pcxt);
     739             : 
     740          64 :         for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     741          38 :             InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     742             :     }
     743             : 
     744             :     /*
     745             :      * Reset all index status back to initial (while checking that we have
     746             :      * vacuumed all indexes).
     747             :      */
     748         184 :     for (int i = 0; i < pvs->nindexes; i++)
     749             :     {
     750         148 :         PVIndStats *indstats = &(pvs->indstats[i]);
     751             : 
     752         148 :         if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     753           0 :             elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     754             :                  RelationGetRelationName(pvs->indrels[i]));
     755             : 
     756         148 :         indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     757             :     }
     758             : 
     759             :     /*
     760             :      * Carry the shared balance value to heap scan and disable shared costing
     761             :      */
     762          36 :     if (VacuumSharedCostBalance)
     763             :     {
     764          26 :         VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     765          26 :         VacuumSharedCostBalance = NULL;
     766          26 :         VacuumActiveNWorkers = NULL;
     767             :     }
     768          36 : }
     769             : 
     770             : /*
     771             :  * Index vacuum/cleanup routine used by the leader process and parallel
     772             :  * vacuum worker processes to vacuum the indexes in parallel.
     773             :  */
     774             : static void
     775          74 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     776             : {
     777             :     /*
     778             :      * Increment the active worker count if we are able to launch any worker.
     779             :      */
     780          74 :     if (VacuumActiveNWorkers)
     781          64 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     782             : 
     783             :     /* Loop until all indexes are vacuumed */
     784             :     for (;;)
     785         148 :     {
     786             :         int         idx;
     787             :         PVIndStats *indstats;
     788             : 
     789             :         /* Get an index number to process */
     790         222 :         idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     791             : 
     792             :         /* Done for all indexes? */
     793         222 :         if (idx >= pvs->nindexes)
     794          74 :             break;
     795             : 
     796         148 :         indstats = &(pvs->indstats[idx]);
     797             : 
     798             :         /*
     799             :          * Skip vacuuming index that is unsafe for workers or has an
     800             :          * unsuitable target for parallel index vacuum (this is vacuumed in
     801             :          * parallel_vacuum_process_unsafe_indexes() by the leader).
     802             :          */
     803         148 :         if (!indstats->parallel_workers_can_process)
     804          60 :             continue;
     805             : 
     806             :         /* Do vacuum or cleanup of the index */
     807          88 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
     808             :     }
     809             : 
     810             :     /*
     811             :      * We have completed the index vacuum so decrement the active worker
     812             :      * count.
     813             :      */
     814          74 :     if (VacuumActiveNWorkers)
     815          64 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     816          74 : }
     817             : 
     818             : /*
     819             :  * Perform parallel vacuuming of indexes in leader process.
     820             :  *
     821             :  * Handles index vacuuming (or index cleanup) for indexes that are not
     822             :  * parallel safe.  It's possible that this will vary for a given index, based
     823             :  * on details like whether we're performing index cleanup right now.
     824             :  *
     825             :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
     826             :  * enforced by parallel_vacuum_compute_workers().
     827             :  */
     828             : static void
     829          36 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
     830             : {
     831             :     Assert(!IsParallelWorker());
     832             : 
     833             :     /*
     834             :      * Increment the active worker count if we are able to launch any worker.
     835             :      */
     836          36 :     if (VacuumActiveNWorkers)
     837          26 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     838             : 
     839         184 :     for (int i = 0; i < pvs->nindexes; i++)
     840             :     {
     841         148 :         PVIndStats *indstats = &(pvs->indstats[i]);
     842             : 
     843             :         /* Skip, indexes that are safe for workers */
     844         148 :         if (indstats->parallel_workers_can_process)
     845          88 :             continue;
     846             : 
     847             :         /* Do vacuum or cleanup of the index */
     848          60 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
     849             :     }
     850             : 
     851             :     /*
     852             :      * We have completed the index vacuum so decrement the active worker
     853             :      * count.
     854             :      */
     855          36 :     if (VacuumActiveNWorkers)
     856          26 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     857          36 : }
     858             : 
     859             : /*
     860             :  * Vacuum or cleanup index either by leader process or by one of the worker
     861             :  * process.  After vacuuming the index this function copies the index
     862             :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
     863             :  * segment.
     864             :  */
     865             : static void
     866         148 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     867             :                                   PVIndStats *indstats)
     868             : {
     869         148 :     IndexBulkDeleteResult *istat = NULL;
     870             :     IndexBulkDeleteResult *istat_res;
     871             :     IndexVacuumInfo ivinfo;
     872             : 
     873             :     /*
     874             :      * Update the pointer to the corresponding bulk-deletion result if someone
     875             :      * has already updated it
     876             :      */
     877         148 :     if (indstats->istat_updated)
     878          50 :         istat = &(indstats->istat);
     879             : 
     880         148 :     ivinfo.index = indrel;
     881         148 :     ivinfo.heaprel = pvs->heaprel;
     882         148 :     ivinfo.analyze_only = false;
     883         148 :     ivinfo.report_progress = false;
     884         148 :     ivinfo.message_level = DEBUG2;
     885         148 :     ivinfo.estimated_count = pvs->shared->estimated_count;
     886         148 :     ivinfo.num_heap_tuples = pvs->shared->reltuples;
     887         148 :     ivinfo.strategy = pvs->bstrategy;
     888             : 
     889             :     /* Update error traceback information */
     890         148 :     pvs->indname = pstrdup(RelationGetRelationName(indrel));
     891         148 :     pvs->status = indstats->status;
     892             : 
     893         148 :     switch (indstats->status)
     894             :     {
     895          50 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
     896          50 :             istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
     897          50 :                                               &pvs->shared->dead_items_info);
     898          50 :             break;
     899          98 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
     900          98 :             istat_res = vac_cleanup_one_index(&ivinfo, istat);
     901          98 :             break;
     902           0 :         default:
     903           0 :             elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
     904             :                  indstats->status,
     905             :                  RelationGetRelationName(indrel));
     906             :     }
     907             : 
     908             :     /*
     909             :      * Copy the index bulk-deletion result returned from ambulkdelete and
     910             :      * amvacuumcleanup to the DSM segment if it's the first cycle because they
     911             :      * allocate locally and it's possible that an index will be vacuumed by a
     912             :      * different vacuum process the next cycle.  Copying the result normally
     913             :      * happens only the first time an index is vacuumed.  For any additional
     914             :      * vacuum pass, we directly point to the result on the DSM segment and
     915             :      * pass it to vacuum index APIs so that workers can update it directly.
     916             :      *
     917             :      * Since all vacuum workers write the bulk-deletion result at different
     918             :      * slots we can write them without locking.
     919             :      */
     920         148 :     if (!indstats->istat_updated && istat_res != NULL)
     921             :     {
     922          82 :         memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
     923          82 :         indstats->istat_updated = true;
     924             : 
     925             :         /* Free the locally-allocated bulk-deletion result */
     926          82 :         pfree(istat_res);
     927             :     }
     928             : 
     929             :     /*
     930             :      * Update the status to completed. No need to lock here since each worker
     931             :      * touches different indexes.
     932             :      */
     933         148 :     indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     934             : 
     935             :     /* Reset error traceback information */
     936         148 :     pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     937         148 :     pfree(pvs->indname);
     938         148 :     pvs->indname = NULL;
     939             : 
     940             :     /*
     941             :      * Call the parallel variant of pgstat_progress_incr_param so workers can
     942             :      * report progress of index vacuum to the leader.
     943             :      */
     944         148 :     pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
     945         148 : }
     946             : 
     947             : /*
     948             :  * Returns false, if the given index can't participate in the next execution of
     949             :  * parallel index vacuum or parallel index cleanup.
     950             :  */
     951             : static bool
     952         136 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     953             :                                        bool vacuum)
     954             : {
     955             :     uint8       vacoptions;
     956             : 
     957         136 :     vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     958             : 
     959             :     /* In parallel vacuum case, check if it supports parallel bulk-deletion */
     960         136 :     if (vacuum)
     961          44 :         return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
     962             : 
     963             :     /* Not safe, if the index does not support parallel cleanup */
     964          92 :     if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
     965          68 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
     966          12 :         return false;
     967             : 
     968             :     /*
     969             :      * Not safe, if the index supports parallel cleanup conditionally, but we
     970             :      * have already processed the index (for bulkdelete).  We do this to avoid
     971             :      * the need to invoke workers when parallel index cleanup doesn't need to
     972             :      * scan the index.  See the comments for option
     973             :      * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
     974             :      * parallel cleanup conditionally.
     975             :      */
     976          80 :     if (num_index_scans > 0 &&
     977          40 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     978          32 :         return false;
     979             : 
     980          48 :     return true;
     981             : }
     982             : 
     983             : /*
     984             :  * Perform work within a launched parallel process.
     985             :  *
     986             :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
     987             :  * we don't need to report progress information.
     988             :  */
     989             : void
     990          38 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
     991             : {
     992             :     ParallelVacuumState pvs;
     993             :     Relation    rel;
     994             :     Relation   *indrels;
     995             :     PVIndStats *indstats;
     996             :     PVShared   *shared;
     997             :     TidStore   *dead_items;
     998             :     BufferUsage *buffer_usage;
     999             :     WalUsage   *wal_usage;
    1000             :     int         nindexes;
    1001             :     char       *sharedquery;
    1002             :     ErrorContextCallback errcallback;
    1003             : 
    1004             :     /*
    1005             :      * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
    1006             :      * don't support parallel vacuum for autovacuum as of now.
    1007             :      */
    1008             :     Assert(MyProc->statusFlags == PROC_IN_VACUUM);
    1009             : 
    1010          38 :     elog(DEBUG1, "starting parallel vacuum worker");
    1011             : 
    1012          38 :     shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
    1013             : 
    1014             :     /* Set debug_query_string for individual workers */
    1015          38 :     sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
    1016          38 :     debug_query_string = sharedquery;
    1017          38 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1018             : 
    1019             :     /* Track query ID */
    1020          38 :     pgstat_report_query_id(shared->queryid, false);
    1021             : 
    1022             :     /*
    1023             :      * Open table.  The lock mode is the same as the leader process.  It's
    1024             :      * okay because the lock mode does not conflict among the parallel
    1025             :      * workers.
    1026             :      */
    1027          38 :     rel = table_open(shared->relid, ShareUpdateExclusiveLock);
    1028             : 
    1029             :     /*
    1030             :      * Open all indexes. indrels are sorted in order by OID, which should be
    1031             :      * matched to the leader's one.
    1032             :      */
    1033          38 :     vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
    1034             :     Assert(nindexes > 0);
    1035             : 
    1036          38 :     if (shared->maintenance_work_mem_worker > 0)
    1037          38 :         maintenance_work_mem = shared->maintenance_work_mem_worker;
    1038             : 
    1039             :     /* Set index statistics */
    1040          38 :     indstats = (PVIndStats *) shm_toc_lookup(toc,
    1041             :                                              PARALLEL_VACUUM_KEY_INDEX_STATS,
    1042             :                                              false);
    1043             : 
    1044             :     /* Find dead_items in shared memory */
    1045          38 :     dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
    1046             :                                 shared->dead_items_handle);
    1047             : 
    1048             :     /* Set cost-based vacuum delay */
    1049          38 :     VacuumUpdateCosts();
    1050          38 :     VacuumCostBalance = 0;
    1051          38 :     VacuumCostBalanceLocal = 0;
    1052          38 :     VacuumSharedCostBalance = &(shared->cost_balance);
    1053          38 :     VacuumActiveNWorkers = &(shared->active_nworkers);
    1054             : 
    1055             :     /* Set parallel vacuum state */
    1056          38 :     pvs.indrels = indrels;
    1057          38 :     pvs.nindexes = nindexes;
    1058          38 :     pvs.indstats = indstats;
    1059          38 :     pvs.shared = shared;
    1060          38 :     pvs.dead_items = dead_items;
    1061          38 :     pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1062          38 :     pvs.relname = pstrdup(RelationGetRelationName(rel));
    1063          38 :     pvs.heaprel = rel;
    1064             : 
    1065             :     /* These fields will be filled during index vacuum or cleanup */
    1066          38 :     pvs.indname = NULL;
    1067          38 :     pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1068             : 
    1069             :     /* Each parallel VACUUM worker gets its own access strategy. */
    1070          76 :     pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1071          38 :                                               shared->ring_nbuffers * (BLCKSZ / 1024));
    1072             : 
    1073             :     /* Setup error traceback support for ereport() */
    1074          38 :     errcallback.callback = parallel_vacuum_error_callback;
    1075          38 :     errcallback.arg = &pvs;
    1076          38 :     errcallback.previous = error_context_stack;
    1077          38 :     error_context_stack = &errcallback;
    1078             : 
    1079             :     /* Prepare to track buffer usage during parallel execution */
    1080          38 :     InstrStartParallelQuery();
    1081             : 
    1082             :     /* Process indexes to perform vacuum/cleanup */
    1083          38 :     parallel_vacuum_process_safe_indexes(&pvs);
    1084             : 
    1085             :     /* Report buffer/WAL usage during parallel execution */
    1086          38 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1087          38 :     wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1088          38 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1089          38 :                           &wal_usage[ParallelWorkerNumber]);
    1090             : 
    1091          38 :     TidStoreDetach(dead_items);
    1092             : 
    1093             :     /* Pop the error context stack */
    1094          38 :     error_context_stack = errcallback.previous;
    1095             : 
    1096          38 :     vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1097          38 :     table_close(rel, ShareUpdateExclusiveLock);
    1098          38 :     FreeAccessStrategy(pvs.bstrategy);
    1099          38 : }
    1100             : 
    1101             : /*
    1102             :  * Error context callback for errors occurring during parallel index vacuum.
    1103             :  * The error context messages should match the messages set in the lazy vacuum
    1104             :  * error context.  If you change this function, change vacuum_error_callback()
    1105             :  * as well.
    1106             :  */
    1107             : static void
    1108           0 : parallel_vacuum_error_callback(void *arg)
    1109             : {
    1110           0 :     ParallelVacuumState *errinfo = arg;
    1111             : 
    1112           0 :     switch (errinfo->status)
    1113             :     {
    1114           0 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1115           0 :             errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1116             :                        errinfo->indname,
    1117             :                        errinfo->relnamespace,
    1118             :                        errinfo->relname);
    1119           0 :             break;
    1120           0 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1121           0 :             errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1122             :                        errinfo->indname,
    1123             :                        errinfo->relnamespace,
    1124             :                        errinfo->relname);
    1125           0 :             break;
    1126           0 :         case PARALLEL_INDVAC_STATUS_INITIAL:
    1127             :         case PARALLEL_INDVAC_STATUS_COMPLETED:
    1128             :         default:
    1129           0 :             return;
    1130             :     }
    1131             : }

Generated by: LCOV version 1.14