LCOV - code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 294 309 95.1 %
Date: 2023-05-30 18:12:27 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * vacuumparallel.c
       4             :  *    Support routines for parallel vacuum execution.
       5             :  *
       6             :  * This file contains routines that are intended to support setting up, using,
       7             :  * and tearing down a ParallelVacuumState.
       8             :  *
       9             :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      10             :  * with parallel worker processes.  Individual indexes are processed by one
      11             :  * vacuum process.  ParallelVacuumState contains shared information as well as
      12             :  * the memory space for storing dead items allocated in the DSM segment.  We
      13             :  * launch parallel worker processes at the start of parallel index
      14             :  * bulk-deletion and index cleanup and once all indexes are processed, the
      15             :  * parallel worker processes exit.  Each time we process indexes in parallel,
      16             :  * the parallel context is re-initialized so that the same DSM can be used for
      17             :  * multiple passes of index bulk-deletion and index cleanup.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/commands/vacuumparallel.c
      24             :  *
      25             :  *-------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/amapi.h"
      30             : #include "access/table.h"
      31             : #include "access/xact.h"
      32             : #include "catalog/index.h"
      33             : #include "commands/vacuum.h"
      34             : #include "optimizer/paths.h"
      35             : #include "pgstat.h"
      36             : #include "storage/bufmgr.h"
      37             : #include "tcop/tcopprot.h"
      38             : #include "utils/lsyscache.h"
      39             : #include "utils/rel.h"
      40             : 
      41             : /*
      42             :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      43             :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      44             :  * use small integers.
      45             :  */
      46             : #define PARALLEL_VACUUM_KEY_SHARED          1
      47             : #define PARALLEL_VACUUM_KEY_DEAD_ITEMS      2
      48             : #define PARALLEL_VACUUM_KEY_QUERY_TEXT      3
      49             : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE    4
      50             : #define PARALLEL_VACUUM_KEY_WAL_USAGE       5
      51             : #define PARALLEL_VACUUM_KEY_INDEX_STATS     6
      52             : 
      53             : /*
      54             :  * Shared information among parallel workers.  So this is allocated in the DSM
      55             :  * segment.
      56             :  */
      57             : typedef struct PVShared
      58             : {
      59             :     /*
      60             :      * Target table relid and log level (for messages about parallel workers
      61             :      * launched during VACUUM VERBOSE).  These fields are not modified during
      62             :      * the parallel vacuum.
      63             :      */
      64             :     Oid         relid;
      65             :     int         elevel;
      66             : 
      67             :     /*
      68             :      * Fields for both index vacuum and cleanup.
      69             :      *
      70             :      * reltuples is the total number of input heap tuples.  We set either old
      71             :      * live tuples in the index vacuum case or the new live tuples in the
      72             :      * index cleanup case.
      73             :      *
      74             :      * estimated_count is true if reltuples is an estimated value.  (Note that
      75             :      * reltuples could be -1 in this case, indicating we have no idea.)
      76             :      */
      77             :     double      reltuples;
      78             :     bool        estimated_count;
      79             : 
      80             :     /*
      81             :      * In single process vacuum we could consume more memory during index
      82             :      * vacuuming or cleanup apart from the memory for heap scanning.  In
      83             :      * parallel vacuum, since individual vacuum workers can consume memory
      84             :      * equal to maintenance_work_mem, the new maintenance_work_mem for each
      85             :      * worker is set such that the parallel operation doesn't consume more
      86             :      * memory than single process vacuum.
      87             :      */
      88             :     int         maintenance_work_mem_worker;
      89             : 
      90             :     /*
      91             :      * The number of buffers each worker's Buffer Access Strategy ring should
      92             :      * contain.
      93             :      */
      94             :     int         ring_nbuffers;
      95             : 
      96             :     /*
      97             :      * Shared vacuum cost balance.  During parallel vacuum,
      98             :      * VacuumSharedCostBalance points to this value and it accumulates the
      99             :      * balance of each parallel vacuum worker.
     100             :      */
     101             :     pg_atomic_uint32 cost_balance;
     102             : 
     103             :     /*
     104             :      * Number of active parallel workers.  This is used for computing the
     105             :      * minimum threshold of the vacuum cost balance before a worker sleeps for
     106             :      * cost-based delay.
     107             :      */
     108             :     pg_atomic_uint32 active_nworkers;
     109             : 
     110             :     /* Counter for vacuuming and cleanup */
     111             :     pg_atomic_uint32 idx;
     112             : } PVShared;
     113             : 
     114             : /* Status used during parallel index vacuum or cleanup */
     115             : typedef enum PVIndVacStatus
     116             : {
     117             :     PARALLEL_INDVAC_STATUS_INITIAL = 0,
     118             :     PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     119             :     PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     120             :     PARALLEL_INDVAC_STATUS_COMPLETED
     121             : } PVIndVacStatus;
     122             : 
     123             : /*
     124             :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     125             :  * This includes the status of parallel index vacuum as well as index statistics.
     126             :  */
     127             : typedef struct PVIndStats
     128             : {
     129             :     /*
     130             :      * The following two fields are set by leader process before executing
     131             :      * parallel index vacuum or parallel index cleanup.  These fields are not
     132             :      * fixed for the entire VACUUM operation.  They are only fixed for an
     133             :      * individual parallel index vacuum and cleanup.
     134             :      *
     135             :      * parallel_workers_can_process is true if both leader and worker can
     136             :      * process the index, otherwise only leader can process it.
     137             :      */
     138             :     PVIndVacStatus status;
     139             :     bool        parallel_workers_can_process;
     140             : 
     141             :     /*
     142             :      * Individual worker or leader stores the result of index vacuum or
     143             :      * cleanup.
     144             :      */
     145             :     bool        istat_updated;  /* are the stats updated? */
     146             :     IndexBulkDeleteResult istat;
     147             : } PVIndStats;
     148             : 
     149             : /*
     150             :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     151             :  */
     152             : struct ParallelVacuumState
     153             : {
     154             :     /* NULL for worker processes */
     155             :     ParallelContext *pcxt;
     156             : 
     157             :     /* Parent Heap Relation */
     158             :     Relation    heaprel;
     159             : 
     160             :     /* Target indexes */
     161             :     Relation   *indrels;
     162             :     int         nindexes;
     163             : 
     164             :     /* Shared information among parallel vacuum workers */
     165             :     PVShared   *shared;
     166             : 
     167             :     /*
     168             :      * Shared index statistics among parallel vacuum workers. The array
     169             :      * element is allocated for every index, even those indexes where parallel
     170             :      * index vacuuming is unsafe or not worthwhile (e.g.,
     171             :      * will_parallel_vacuum[] is false).  During parallel vacuum,
     172             :      * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     173             :      * local memory at the end of parallel vacuum.
     174             :      */
     175             :     PVIndStats *indstats;
     176             : 
     177             :     /* Shared dead items space among parallel vacuum workers */
     178             :     VacDeadItems *dead_items;
     179             : 
     180             :     /* Points to buffer usage area in DSM */
     181             :     BufferUsage *buffer_usage;
     182             : 
     183             :     /* Points to WAL usage area in DSM */
     184             :     WalUsage   *wal_usage;
     185             : 
     186             :     /*
     187             :      * False if the index is totally unsuitable target for all parallel
     188             :      * processing. For example, the index could be <
     189             :      * min_parallel_index_scan_size cutoff.
     190             :      */
     191             :     bool       *will_parallel_vacuum;
     192             : 
     193             :     /*
     194             :      * The number of indexes that support parallel index bulk-deletion and
     195             :      * parallel index cleanup respectively.
     196             :      */
     197             :     int         nindexes_parallel_bulkdel;
     198             :     int         nindexes_parallel_cleanup;
     199             :     int         nindexes_parallel_condcleanup;
     200             : 
     201             :     /* Buffer access strategy used by leader process */
     202             :     BufferAccessStrategy bstrategy;
     203             : 
     204             :     /*
     205             :      * Error reporting state.  The error callback is set only for workers
     206             :      * processes during parallel index vacuum.
     207             :      */
     208             :     char       *relnamespace;
     209             :     char       *relname;
     210             :     char       *indname;
     211             :     PVIndVacStatus status;
     212             : };
     213             : 
     214             : static int  parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     215             :                                             bool *will_parallel_vacuum);
     216             : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     217             :                                                 bool vacuum);
     218             : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     219             : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     220             : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     221             :                                               PVIndStats *indstats);
     222             : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     223             :                                                    bool vacuum);
     224             : static void parallel_vacuum_error_callback(void *arg);
     225             : 
     226             : /*
     227             :  * Try to enter parallel mode and create a parallel context.  Then initialize
     228             :  * shared memory state.
     229             :  *
     230             :  * On success, return parallel vacuum state.  Otherwise return NULL.
     231             :  */
     232             : ParallelVacuumState *
     233       31564 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     234             :                      int nrequested_workers, int max_items,
     235             :                      int elevel, BufferAccessStrategy bstrategy)
     236             : {
     237             :     ParallelVacuumState *pvs;
     238             :     ParallelContext *pcxt;
     239             :     PVShared   *shared;
     240             :     VacDeadItems *dead_items;
     241             :     PVIndStats *indstats;
     242             :     BufferUsage *buffer_usage;
     243             :     WalUsage   *wal_usage;
     244             :     bool       *will_parallel_vacuum;
     245             :     Size        est_indstats_len;
     246             :     Size        est_shared_len;
     247             :     Size        est_dead_items_len;
     248       31564 :     int         nindexes_mwm = 0;
     249       31564 :     int         parallel_workers = 0;
     250             :     int         querylen;
     251             : 
     252             :     /*
     253             :      * A parallel vacuum must be requested and there must be indexes on the
     254             :      * relation
     255             :      */
     256             :     Assert(nrequested_workers >= 0);
     257             :     Assert(nindexes > 0);
     258             : 
     259             :     /*
     260             :      * Compute the number of parallel vacuum workers to launch
     261             :      */
     262       31564 :     will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
     263       31564 :     parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     264             :                                                        nrequested_workers,
     265             :                                                        will_parallel_vacuum);
     266       31564 :     if (parallel_workers <= 0)
     267             :     {
     268             :         /* Can't perform vacuum in parallel -- return NULL */
     269       31546 :         pfree(will_parallel_vacuum);
     270       31546 :         return NULL;
     271             :     }
     272             : 
     273          18 :     pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
     274          18 :     pvs->indrels = indrels;
     275          18 :     pvs->nindexes = nindexes;
     276          18 :     pvs->will_parallel_vacuum = will_parallel_vacuum;
     277          18 :     pvs->bstrategy = bstrategy;
     278          18 :     pvs->heaprel = rel;
     279             : 
     280          18 :     EnterParallelMode();
     281          18 :     pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     282             :                                  parallel_workers);
     283             :     Assert(pcxt->nworkers > 0);
     284          18 :     pvs->pcxt = pcxt;
     285             : 
     286             :     /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     287          18 :     est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     288          18 :     shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     289          18 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     290             : 
     291             :     /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     292          18 :     est_shared_len = sizeof(PVShared);
     293          18 :     shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     294          18 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     295             : 
     296             :     /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
     297          18 :     est_dead_items_len = vac_max_items_to_alloc_size(max_items);
     298          18 :     shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
     299          18 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     300             : 
     301             :     /*
     302             :      * Estimate space for BufferUsage and WalUsage --
     303             :      * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     304             :      *
     305             :      * If there are no extensions loaded that care, we could skip this.  We
     306             :      * have no way of knowing whether anyone's looking at pgBufferUsage or
     307             :      * pgWalUsage, so do it unconditionally.
     308             :      */
     309          18 :     shm_toc_estimate_chunk(&pcxt->estimator,
     310             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     311          18 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     312          18 :     shm_toc_estimate_chunk(&pcxt->estimator,
     313             :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     314          18 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     315             : 
     316             :     /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     317          18 :     if (debug_query_string)
     318             :     {
     319          18 :         querylen = strlen(debug_query_string);
     320          18 :         shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     321          18 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     322             :     }
     323             :     else
     324           0 :         querylen = 0;           /* keep compiler quiet */
     325             : 
     326          18 :     InitializeParallelDSM(pcxt);
     327             : 
     328             :     /* Prepare index vacuum stats */
     329          18 :     indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     330         558 :     MemSet(indstats, 0, est_indstats_len);
     331         108 :     for (int i = 0; i < nindexes; i++)
     332             :     {
     333          90 :         Relation    indrel = indrels[i];
     334          90 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     335             : 
     336             :         /*
     337             :          * Cleanup option should be either disabled, always performing in
     338             :          * parallel or conditionally performing in parallel.
     339             :          */
     340             :         Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     341             :                ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     342             :         Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     343             : 
     344          90 :         if (!will_parallel_vacuum[i])
     345           6 :             continue;
     346             : 
     347          84 :         if (indrel->rd_indam->amusemaintenanceworkmem)
     348          12 :             nindexes_mwm++;
     349             : 
     350             :         /*
     351             :          * Remember the number of indexes that support parallel operation for
     352             :          * each phase.
     353             :          */
     354          84 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     355          72 :             pvs->nindexes_parallel_bulkdel++;
     356          84 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     357          24 :             pvs->nindexes_parallel_cleanup++;
     358          84 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     359          48 :             pvs->nindexes_parallel_condcleanup++;
     360             :     }
     361          18 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     362          18 :     pvs->indstats = indstats;
     363             : 
     364             :     /* Prepare shared information */
     365          18 :     shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     366         108 :     MemSet(shared, 0, est_shared_len);
     367          18 :     shared->relid = RelationGetRelid(rel);
     368          18 :     shared->elevel = elevel;
     369          18 :     shared->maintenance_work_mem_worker =
     370             :         (nindexes_mwm > 0) ?
     371          18 :         maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
     372             :         maintenance_work_mem;
     373             : 
     374             :     /* Use the same buffer size for all workers */
     375          18 :     shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     376             : 
     377          18 :     pg_atomic_init_u32(&(shared->cost_balance), 0);
     378          18 :     pg_atomic_init_u32(&(shared->active_nworkers), 0);
     379          18 :     pg_atomic_init_u32(&(shared->idx), 0);
     380             : 
     381          18 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     382          18 :     pvs->shared = shared;
     383             : 
     384             :     /* Prepare the dead_items space */
     385          18 :     dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
     386             :                                                    est_dead_items_len);
     387          18 :     dead_items->max_items = max_items;
     388          18 :     dead_items->num_items = 0;
     389          18 :     MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
     390          18 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
     391          18 :     pvs->dead_items = dead_items;
     392             : 
     393             :     /*
     394             :      * Allocate space for each worker's BufferUsage and WalUsage; no need to
     395             :      * initialize
     396             :      */
     397          18 :     buffer_usage = shm_toc_allocate(pcxt->toc,
     398          18 :                                     mul_size(sizeof(BufferUsage), pcxt->nworkers));
     399          18 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     400          18 :     pvs->buffer_usage = buffer_usage;
     401          18 :     wal_usage = shm_toc_allocate(pcxt->toc,
     402          18 :                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     403          18 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     404          18 :     pvs->wal_usage = wal_usage;
     405             : 
     406             :     /* Store query string for workers */
     407          18 :     if (debug_query_string)
     408             :     {
     409             :         char       *sharedquery;
     410             : 
     411          18 :         sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     412          18 :         memcpy(sharedquery, debug_query_string, querylen + 1);
     413          18 :         sharedquery[querylen] = '\0';
     414          18 :         shm_toc_insert(pcxt->toc,
     415             :                        PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     416             :     }
     417             : 
     418             :     /* Success -- return parallel vacuum state */
     419          18 :     return pvs;
     420             : }
     421             : 
     422             : /*
     423             :  * Destroy the parallel context, and end parallel mode.
     424             :  *
     425             :  * Since writes are not allowed during parallel mode, copy the
     426             :  * updated index statistics from DSM into local memory and then later use that
     427             :  * to update the index statistics.  One might think that we can exit from
     428             :  * parallel mode, update the index statistics and then destroy parallel
     429             :  * context, but that won't be safe (see ExitParallelMode).
     430             :  */
     431             : void
     432          18 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     433             : {
     434             :     Assert(!IsParallelWorker());
     435             : 
     436             :     /* Copy the updated statistics */
     437         108 :     for (int i = 0; i < pvs->nindexes; i++)
     438             :     {
     439          90 :         PVIndStats *indstats = &(pvs->indstats[i]);
     440             : 
     441          90 :         if (indstats->istat_updated)
     442             :         {
     443          70 :             istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     444          70 :             memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     445             :         }
     446             :         else
     447          20 :             istats[i] = NULL;
     448             :     }
     449             : 
     450          18 :     DestroyParallelContext(pvs->pcxt);
     451          18 :     ExitParallelMode();
     452             : 
     453          18 :     pfree(pvs->will_parallel_vacuum);
     454          18 :     pfree(pvs);
     455          18 : }
     456             : 
     457             : /* Returns the dead items space */
     458             : VacDeadItems *
     459          18 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
     460             : {
     461          18 :     return pvs->dead_items;
     462             : }
     463             : 
     464             : /*
     465             :  * Do parallel index bulk-deletion with parallel workers.
     466             :  */
     467             : void
     468           8 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     469             :                                     int num_index_scans)
     470             : {
     471             :     Assert(!IsParallelWorker());
     472             : 
     473             :     /*
     474             :      * We can only provide an approximate value of num_heap_tuples, at least
     475             :      * for now.
     476             :      */
     477           8 :     pvs->shared->reltuples = num_table_tuples;
     478           8 :     pvs->shared->estimated_count = true;
     479             : 
     480           8 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
     481           8 : }
     482             : 
     483             : /*
     484             :  * Do parallel index cleanup with parallel workers.
     485             :  */
     486             : void
     487          18 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     488             :                                     int num_index_scans, bool estimated_count)
     489             : {
     490             :     Assert(!IsParallelWorker());
     491             : 
     492             :     /*
     493             :      * We can provide a better estimate of total number of surviving tuples
     494             :      * (we assume indexes are more interested in that than in the number of
     495             :      * nominally live tuples).
     496             :      */
     497          18 :     pvs->shared->reltuples = num_table_tuples;
     498          18 :     pvs->shared->estimated_count = estimated_count;
     499             : 
     500          18 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
     501          18 : }
     502             : 
     503             : /*
     504             :  * Compute the number of parallel worker processes to request.  Both index
     505             :  * vacuum and index cleanup can be executed with parallel workers.
     506             :  * The index is eligible for parallel vacuum iff its size is greater than
     507             :  * min_parallel_index_scan_size as invoking workers for very small indexes
     508             :  * can hurt performance.
     509             :  *
     510             :  * nrequested is the number of parallel workers that user requested.  If
     511             :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     512             :  * the number of indexes that support parallel vacuum.  This function also
     513             :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     514             :  * vacuum.
     515             :  */
     516             : static int
     517       31564 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     518             :                                 bool *will_parallel_vacuum)
     519             : {
     520       31564 :     int         nindexes_parallel = 0;
     521       31564 :     int         nindexes_parallel_bulkdel = 0;
     522       31564 :     int         nindexes_parallel_cleanup = 0;
     523             :     int         parallel_workers;
     524             : 
     525             :     /*
     526             :      * We don't allow performing parallel operation in standalone backend or
     527             :      * when parallelism is disabled.
     528             :      */
     529       31564 :     if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
     530       29498 :         return 0;
     531             : 
     532             :     /*
     533             :      * Compute the number of indexes that can participate in parallel vacuum.
     534             :      */
     535        6774 :     for (int i = 0; i < nindexes; i++)
     536             :     {
     537        4708 :         Relation    indrel = indrels[i];
     538        4708 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     539             : 
     540             :         /* Skip index that is not a suitable target for parallel index vacuum */
     541        4708 :         if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     542        4708 :             RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     543        4612 :             continue;
     544             : 
     545          96 :         will_parallel_vacuum[i] = true;
     546             : 
     547          96 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     548          84 :             nindexes_parallel_bulkdel++;
     549          96 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     550          72 :             ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     551          84 :             nindexes_parallel_cleanup++;
     552             :     }
     553             : 
     554        2066 :     nindexes_parallel = Max(nindexes_parallel_bulkdel,
     555             :                             nindexes_parallel_cleanup);
     556             : 
     557             :     /* The leader process takes one index */
     558        2066 :     nindexes_parallel--;
     559             : 
     560             :     /* No index supports parallel vacuum */
     561        2066 :     if (nindexes_parallel <= 0)
     562        2048 :         return 0;
     563             : 
     564             :     /* Compute the parallel degree */
     565          18 :     parallel_workers = (nrequested > 0) ?
     566          18 :         Min(nrequested, nindexes_parallel) : nindexes_parallel;
     567             : 
     568             :     /* Cap by max_parallel_maintenance_workers */
     569          18 :     parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
     570             : 
     571          18 :     return parallel_workers;
     572             : }
     573             : 
     574             : /*
     575             :  * Perform index vacuum or index cleanup with parallel workers.  This function
     576             :  * must be used by the parallel vacuum leader process.
     577             :  */
     578             : static void
     579          26 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     580             :                                     bool vacuum)
     581             : {
     582             :     int         nworkers;
     583             :     PVIndVacStatus new_status;
     584             : 
     585             :     Assert(!IsParallelWorker());
     586             : 
     587          26 :     if (vacuum)
     588             :     {
     589           8 :         new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     590             : 
     591             :         /* Determine the number of parallel workers to launch */
     592           8 :         nworkers = pvs->nindexes_parallel_bulkdel;
     593             :     }
     594             :     else
     595             :     {
     596          18 :         new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     597             : 
     598             :         /* Determine the number of parallel workers to launch */
     599          18 :         nworkers = pvs->nindexes_parallel_cleanup;
     600             : 
     601             :         /* Add conditionally parallel-aware indexes if in the first time call */
     602          18 :         if (num_index_scans == 0)
     603          10 :             nworkers += pvs->nindexes_parallel_condcleanup;
     604             :     }
     605             : 
     606             :     /* The leader process will participate */
     607          26 :     nworkers--;
     608             : 
     609             :     /*
     610             :      * It is possible that parallel context is initialized with fewer workers
     611             :      * than the number of indexes that need a separate worker in the current
     612             :      * phase, so we need to consider it.  See
     613             :      * parallel_vacuum_compute_workers().
     614             :      */
     615          26 :     nworkers = Min(nworkers, pvs->pcxt->nworkers);
     616             : 
     617             :     /*
     618             :      * Set index vacuum status and mark whether parallel vacuum worker can
     619             :      * process it.
     620             :      */
     621         146 :     for (int i = 0; i < pvs->nindexes; i++)
     622             :     {
     623         120 :         PVIndStats *indstats = &(pvs->indstats[i]);
     624             : 
     625             :         Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     626         120 :         indstats->status = new_status;
     627         120 :         indstats->parallel_workers_can_process =
     628         228 :             (pvs->will_parallel_vacuum[i] &&
     629         108 :              parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     630             :                                                     num_index_scans,
     631             :                                                     vacuum));
     632             :     }
     633             : 
     634             :     /* Reset the parallel index processing counter */
     635          26 :     pg_atomic_write_u32(&(pvs->shared->idx), 0);
     636             : 
     637             :     /* Setup the shared cost-based vacuum delay and launch workers */
     638          26 :     if (nworkers > 0)
     639             :     {
     640             :         /* Reinitialize parallel context to relaunch parallel workers */
     641          20 :         if (num_index_scans > 0)
     642           2 :             ReinitializeParallelDSM(pvs->pcxt);
     643             : 
     644             :         /*
     645             :          * Set up shared cost balance and the number of active workers for
     646             :          * vacuum delay.  We need to do this before launching workers as
     647             :          * otherwise, they might not see the updated values for these
     648             :          * parameters.
     649             :          */
     650          20 :         pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     651          20 :         pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     652             : 
     653             :         /*
     654             :          * The number of workers can vary between bulkdelete and cleanup
     655             :          * phase.
     656             :          */
     657          20 :         ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     658             : 
     659          20 :         LaunchParallelWorkers(pvs->pcxt);
     660             : 
     661          20 :         if (pvs->pcxt->nworkers_launched > 0)
     662             :         {
     663             :             /*
     664             :              * Reset the local cost values for leader backend as we have
     665             :              * already accumulated the remaining balance of heap.
     666             :              */
     667          20 :             VacuumCostBalance = 0;
     668          20 :             VacuumCostBalanceLocal = 0;
     669             : 
     670             :             /* Enable shared cost balance for leader backend */
     671          20 :             VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     672          20 :             VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     673             :         }
     674             : 
     675          20 :         if (vacuum)
     676           8 :             ereport(pvs->shared->elevel,
     677             :                     (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     678             :                                      "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     679             :                                      pvs->pcxt->nworkers_launched),
     680             :                             pvs->pcxt->nworkers_launched, nworkers)));
     681             :         else
     682          12 :             ereport(pvs->shared->elevel,
     683             :                     (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     684             :                                      "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     685             :                                      pvs->pcxt->nworkers_launched),
     686             :                             pvs->pcxt->nworkers_launched, nworkers)));
     687             :     }
     688             : 
     689             :     /* Vacuum the indexes that can be processed by only leader process */
     690          26 :     parallel_vacuum_process_unsafe_indexes(pvs);
     691             : 
     692             :     /*
     693             :      * Join as a parallel worker.  The leader vacuums alone processes all
     694             :      * parallel-safe indexes in the case where no workers are launched.
     695             :      */
     696          26 :     parallel_vacuum_process_safe_indexes(pvs);
     697             : 
     698             :     /*
     699             :      * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     700             :      * to finish, or we might get incomplete data.)
     701             :      */
     702          26 :     if (nworkers > 0)
     703             :     {
     704             :         /* Wait for all vacuum workers to finish */
     705          20 :         WaitForParallelWorkersToFinish(pvs->pcxt);
     706             : 
     707          52 :         for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     708          32 :             InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     709             :     }
     710             : 
     711             :     /*
     712             :      * Reset all index status back to initial (while checking that we have
     713             :      * vacuumed all indexes).
     714             :      */
     715         146 :     for (int i = 0; i < pvs->nindexes; i++)
     716             :     {
     717         120 :         PVIndStats *indstats = &(pvs->indstats[i]);
     718             : 
     719         120 :         if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     720           0 :             elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     721             :                  RelationGetRelationName(pvs->indrels[i]));
     722             : 
     723         120 :         indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     724             :     }
     725             : 
     726             :     /*
     727             :      * Carry the shared balance value to heap scan and disable shared costing
     728             :      */
     729          26 :     if (VacuumSharedCostBalance)
     730             :     {
     731          20 :         VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     732          20 :         VacuumSharedCostBalance = NULL;
     733          20 :         VacuumActiveNWorkers = NULL;
     734             :     }
     735          26 : }
     736             : 
     737             : /*
     738             :  * Index vacuum/cleanup routine used by the leader process and parallel
     739             :  * vacuum worker processes to vacuum the indexes in parallel.
     740             :  */
     741             : static void
     742          58 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     743             : {
     744             :     /*
     745             :      * Increment the active worker count if we are able to launch any worker.
     746             :      */
     747          58 :     if (VacuumActiveNWorkers)
     748          52 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     749             : 
     750             :     /* Loop until all indexes are vacuumed */
     751             :     for (;;)
     752         120 :     {
     753             :         int         idx;
     754             :         PVIndStats *indstats;
     755             : 
     756             :         /* Get an index number to process */
     757         178 :         idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     758             : 
     759             :         /* Done for all indexes? */
     760         178 :         if (idx >= pvs->nindexes)
     761          58 :             break;
     762             : 
     763         120 :         indstats = &(pvs->indstats[idx]);
     764             : 
     765             :         /*
     766             :          * Skip vacuuming index that is unsafe for workers or has an
     767             :          * unsuitable target for parallel index vacuum (this is vacuumed in
     768             :          * parallel_vacuum_process_unsafe_indexes() by the leader).
     769             :          */
     770         120 :         if (!indstats->parallel_workers_can_process)
     771          44 :             continue;
     772             : 
     773             :         /* Do vacuum or cleanup of the index */
     774          76 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
     775             :     }
     776             : 
     777             :     /*
     778             :      * We have completed the index vacuum so decrement the active worker
     779             :      * count.
     780             :      */
     781          58 :     if (VacuumActiveNWorkers)
     782          52 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     783          58 : }
     784             : 
     785             : /*
     786             :  * Perform parallel vacuuming of indexes in leader process.
     787             :  *
     788             :  * Handles index vacuuming (or index cleanup) for indexes that are not
     789             :  * parallel safe.  It's possible that this will vary for a given index, based
     790             :  * on details like whether we're performing index cleanup right now.
     791             :  *
     792             :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
     793             :  * enforced by parallel_vacuum_compute_workers().
     794             :  */
     795             : static void
     796          26 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
     797             : {
     798             :     Assert(!IsParallelWorker());
     799             : 
     800             :     /*
     801             :      * Increment the active worker count if we are able to launch any worker.
     802             :      */
     803          26 :     if (VacuumActiveNWorkers)
     804          20 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     805             : 
     806         146 :     for (int i = 0; i < pvs->nindexes; i++)
     807             :     {
     808         120 :         PVIndStats *indstats = &(pvs->indstats[i]);
     809             : 
     810             :         /* Skip, indexes that are safe for workers */
     811         120 :         if (indstats->parallel_workers_can_process)
     812          76 :             continue;
     813             : 
     814             :         /* Do vacuum or cleanup of the index */
     815          44 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
     816             :     }
     817             : 
     818             :     /*
     819             :      * We have completed the index vacuum so decrement the active worker
     820             :      * count.
     821             :      */
     822          26 :     if (VacuumActiveNWorkers)
     823          20 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     824          26 : }
     825             : 
     826             : /*
     827             :  * Vacuum or cleanup index either by leader process or by one of the worker
     828             :  * process.  After vacuuming the index this function copies the index
     829             :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
     830             :  * segment.
     831             :  */
     832             : static void
     833         120 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     834             :                                   PVIndStats *indstats)
     835             : {
     836         120 :     IndexBulkDeleteResult *istat = NULL;
     837             :     IndexBulkDeleteResult *istat_res;
     838             :     IndexVacuumInfo ivinfo;
     839             : 
     840             :     /*
     841             :      * Update the pointer to the corresponding bulk-deletion result if someone
     842             :      * has already updated it
     843             :      */
     844         120 :     if (indstats->istat_updated)
     845          30 :         istat = &(indstats->istat);
     846             : 
     847         120 :     ivinfo.index = indrel;
     848         120 :     ivinfo.heaprel = pvs->heaprel;
     849         120 :     ivinfo.analyze_only = false;
     850         120 :     ivinfo.report_progress = false;
     851         120 :     ivinfo.message_level = DEBUG2;
     852         120 :     ivinfo.estimated_count = pvs->shared->estimated_count;
     853         120 :     ivinfo.num_heap_tuples = pvs->shared->reltuples;
     854         120 :     ivinfo.strategy = pvs->bstrategy;
     855             : 
     856             :     /* Update error traceback information */
     857         120 :     pvs->indname = pstrdup(RelationGetRelationName(indrel));
     858         120 :     pvs->status = indstats->status;
     859             : 
     860         120 :     switch (indstats->status)
     861             :     {
     862          30 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
     863          30 :             istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
     864          30 :             break;
     865          90 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
     866          90 :             istat_res = vac_cleanup_one_index(&ivinfo, istat);
     867          90 :             break;
     868           0 :         default:
     869           0 :             elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
     870             :                  indstats->status,
     871             :                  RelationGetRelationName(indrel));
     872             :     }
     873             : 
     874             :     /*
     875             :      * Copy the index bulk-deletion result returned from ambulkdelete and
     876             :      * amvacuumcleanup to the DSM segment if it's the first cycle because they
     877             :      * allocate locally and it's possible that an index will be vacuumed by a
     878             :      * different vacuum process the next cycle.  Copying the result normally
     879             :      * happens only the first time an index is vacuumed.  For any additional
     880             :      * vacuum pass, we directly point to the result on the DSM segment and
     881             :      * pass it to vacuum index APIs so that workers can update it directly.
     882             :      *
     883             :      * Since all vacuum workers write the bulk-deletion result at different
     884             :      * slots we can write them without locking.
     885             :      */
     886         120 :     if (!indstats->istat_updated && istat_res != NULL)
     887             :     {
     888          70 :         memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
     889          70 :         indstats->istat_updated = true;
     890             : 
     891             :         /* Free the locally-allocated bulk-deletion result */
     892          70 :         pfree(istat_res);
     893             :     }
     894             : 
     895             :     /*
     896             :      * Update the status to completed. No need to lock here since each worker
     897             :      * touches different indexes.
     898             :      */
     899         120 :     indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     900             : 
     901             :     /* Reset error traceback information */
     902         120 :     pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     903         120 :     pfree(pvs->indname);
     904         120 :     pvs->indname = NULL;
     905         120 : }
     906             : 
     907             : /*
     908             :  * Returns false, if the given index can't participate in the next execution of
     909             :  * parallel index vacuum or parallel index cleanup.
     910             :  */
     911             : static bool
     912         108 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     913             :                                        bool vacuum)
     914             : {
     915             :     uint8       vacoptions;
     916             : 
     917         108 :     vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     918             : 
     919             :     /* In parallel vacuum case, check if it supports parallel bulk-deletion */
     920         108 :     if (vacuum)
     921          24 :         return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
     922             : 
     923             :     /* Not safe, if the index does not support parallel cleanup */
     924          84 :     if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
     925          60 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
     926          12 :         return false;
     927             : 
     928             :     /*
     929             :      * Not safe, if the index supports parallel cleanup conditionally, but we
     930             :      * have already processed the index (for bulkdelete).  We do this to avoid
     931             :      * the need to invoke workers when parallel index cleanup doesn't need to
     932             :      * scan the index.  See the comments for option
     933             :      * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
     934             :      * parallel cleanup conditionally.
     935             :      */
     936          72 :     if (num_index_scans > 0 &&
     937          22 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     938          18 :         return false;
     939             : 
     940          54 :     return true;
     941             : }
     942             : 
     943             : /*
     944             :  * Perform work within a launched parallel process.
     945             :  *
     946             :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
     947             :  * we don't need to report progress information.
     948             :  */
     949             : void
     950          32 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
     951             : {
     952             :     ParallelVacuumState pvs;
     953             :     Relation    rel;
     954             :     Relation   *indrels;
     955             :     PVIndStats *indstats;
     956             :     PVShared   *shared;
     957             :     VacDeadItems *dead_items;
     958             :     BufferUsage *buffer_usage;
     959             :     WalUsage   *wal_usage;
     960             :     int         nindexes;
     961             :     char       *sharedquery;
     962             :     ErrorContextCallback errcallback;
     963             : 
     964             :     /*
     965             :      * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
     966             :      * don't support parallel vacuum for autovacuum as of now.
     967             :      */
     968             :     Assert(MyProc->statusFlags == PROC_IN_VACUUM);
     969             : 
     970          32 :     elog(DEBUG1, "starting parallel vacuum worker");
     971             : 
     972          32 :     shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
     973             : 
     974             :     /* Set debug_query_string for individual workers */
     975          32 :     sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
     976          32 :     debug_query_string = sharedquery;
     977          32 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
     978             : 
     979             :     /*
     980             :      * Open table.  The lock mode is the same as the leader process.  It's
     981             :      * okay because the lock mode does not conflict among the parallel
     982             :      * workers.
     983             :      */
     984          32 :     rel = table_open(shared->relid, ShareUpdateExclusiveLock);
     985             : 
     986             :     /*
     987             :      * Open all indexes. indrels are sorted in order by OID, which should be
     988             :      * matched to the leader's one.
     989             :      */
     990          32 :     vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
     991             :     Assert(nindexes > 0);
     992             : 
     993          32 :     if (shared->maintenance_work_mem_worker > 0)
     994          32 :         maintenance_work_mem = shared->maintenance_work_mem_worker;
     995             : 
     996             :     /* Set index statistics */
     997          32 :     indstats = (PVIndStats *) shm_toc_lookup(toc,
     998             :                                              PARALLEL_VACUUM_KEY_INDEX_STATS,
     999             :                                              false);
    1000             : 
    1001             :     /* Set dead_items space */
    1002          32 :     dead_items = (VacDeadItems *) shm_toc_lookup(toc,
    1003             :                                                  PARALLEL_VACUUM_KEY_DEAD_ITEMS,
    1004             :                                                  false);
    1005             : 
    1006             :     /* Set cost-based vacuum delay */
    1007          32 :     VacuumUpdateCosts();
    1008          32 :     VacuumCostBalance = 0;
    1009          32 :     VacuumPageHit = 0;
    1010          32 :     VacuumPageMiss = 0;
    1011          32 :     VacuumPageDirty = 0;
    1012          32 :     VacuumCostBalanceLocal = 0;
    1013          32 :     VacuumSharedCostBalance = &(shared->cost_balance);
    1014          32 :     VacuumActiveNWorkers = &(shared->active_nworkers);
    1015             : 
    1016             :     /* Set parallel vacuum state */
    1017          32 :     pvs.indrels = indrels;
    1018          32 :     pvs.nindexes = nindexes;
    1019          32 :     pvs.indstats = indstats;
    1020          32 :     pvs.shared = shared;
    1021          32 :     pvs.dead_items = dead_items;
    1022          32 :     pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1023          32 :     pvs.relname = pstrdup(RelationGetRelationName(rel));
    1024          32 :     pvs.heaprel = rel;
    1025             : 
    1026             :     /* These fields will be filled during index vacuum or cleanup */
    1027          32 :     pvs.indname = NULL;
    1028          32 :     pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1029             : 
    1030             :     /* Each parallel VACUUM worker gets its own access strategy. */
    1031          64 :     pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1032          32 :                                               shared->ring_nbuffers * (BLCKSZ / 1024));
    1033             : 
    1034             :     /* Setup error traceback support for ereport() */
    1035          32 :     errcallback.callback = parallel_vacuum_error_callback;
    1036          32 :     errcallback.arg = &pvs;
    1037          32 :     errcallback.previous = error_context_stack;
    1038          32 :     error_context_stack = &errcallback;
    1039             : 
    1040             :     /* Prepare to track buffer usage during parallel execution */
    1041          32 :     InstrStartParallelQuery();
    1042             : 
    1043             :     /* Process indexes to perform vacuum/cleanup */
    1044          32 :     parallel_vacuum_process_safe_indexes(&pvs);
    1045             : 
    1046             :     /* Report buffer/WAL usage during parallel execution */
    1047          32 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1048          32 :     wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1049          32 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1050          32 :                           &wal_usage[ParallelWorkerNumber]);
    1051             : 
    1052             :     /* Pop the error context stack */
    1053          32 :     error_context_stack = errcallback.previous;
    1054             : 
    1055          32 :     vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1056          32 :     table_close(rel, ShareUpdateExclusiveLock);
    1057          32 :     FreeAccessStrategy(pvs.bstrategy);
    1058          32 : }
    1059             : 
    1060             : /*
    1061             :  * Error context callback for errors occurring during parallel index vacuum.
    1062             :  * The error context messages should match the messages set in the lazy vacuum
    1063             :  * error context.  If you change this function, change vacuum_error_callback()
    1064             :  * as well.
    1065             :  */
    1066             : static void
    1067           0 : parallel_vacuum_error_callback(void *arg)
    1068             : {
    1069           0 :     ParallelVacuumState *errinfo = arg;
    1070             : 
    1071           0 :     switch (errinfo->status)
    1072             :     {
    1073           0 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1074           0 :             errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1075             :                        errinfo->indname,
    1076             :                        errinfo->relnamespace,
    1077             :                        errinfo->relname);
    1078           0 :             break;
    1079           0 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1080           0 :             errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1081             :                        errinfo->indname,
    1082             :                        errinfo->relnamespace,
    1083             :                        errinfo->relname);
    1084           0 :             break;
    1085           0 :         case PARALLEL_INDVAC_STATUS_INITIAL:
    1086             :         case PARALLEL_INDVAC_STATUS_COMPLETED:
    1087             :         default:
    1088           0 :             return;
    1089             :     }
    1090             : }

Generated by: LCOV version 1.14