LCOV - code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.0 % 379 364
Test Date: 2026-05-02 10:16:34 Functions: 100.0 % 18 18
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * vacuumparallel.c
       4              :  *    Support routines for parallel vacuum and autovacuum execution. In the
       5              :  *    comments below, the word "vacuum" will refer to both vacuum and
       6              :  *    autovacuum.
       7              :  *
       8              :  * This file contains routines that are intended to support setting up, using,
       9              :  * and tearing down a ParallelVacuumState.
      10              :  *
      11              :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      12              :  * with parallel worker processes.  Individual indexes are processed by one
      13              :  * vacuum process.  ParallelVacuumState contains shared information as well as
      14              :  * the memory space for storing dead items allocated in the DSA area.  We
      15              :  * launch parallel worker processes at the start of parallel index
      16              :  * bulk-deletion and index cleanup and once all indexes are processed, the
      17              :  * parallel worker processes exit.  Each time we process indexes in parallel,
      18              :  * the parallel context is re-initialized so that the same DSM can be used for
      19              :  * multiple passes of index bulk-deletion and index cleanup.
      20              :  *
      21              :  * For parallel autovacuum, we need to propagate cost-based vacuum delay
      22              :  * parameters from the leader to its workers, as the leader's parameters can
      23              :  * change even while processing a table (e.g., due to a config reload).
      24              :  * The PVSharedCostParams struct manages these parameters using a
      25              :  * generation counter. Each parallel worker polls this shared state and
      26              :  * refreshes its local delay parameters whenever a change is detected.
      27              :  *
      28              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      29              :  * Portions Copyright (c) 1994, Regents of the University of California
      30              :  *
      31              :  * IDENTIFICATION
      32              :  *    src/backend/commands/vacuumparallel.c
      33              :  *
      34              :  *-------------------------------------------------------------------------
      35              :  */
      36              : #include "postgres.h"
      37              : 
      38              : #include "access/amapi.h"
      39              : #include "access/table.h"
      40              : #include "access/xact.h"
      41              : #include "commands/progress.h"
      42              : #include "commands/vacuum.h"
      43              : #include "executor/instrument.h"
      44              : #include "optimizer/paths.h"
      45              : #include "pgstat.h"
      46              : #include "storage/bufmgr.h"
      47              : #include "storage/proc.h"
      48              : #include "tcop/tcopprot.h"
      49              : #include "utils/lsyscache.h"
      50              : #include "utils/rel.h"
      51              : 
      52              : /*
      53              :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      54              :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      55              :  * use small integers.
      56              :  */
      57              : #define PARALLEL_VACUUM_KEY_SHARED          1
      58              : #define PARALLEL_VACUUM_KEY_QUERY_TEXT      2
      59              : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE    3
      60              : #define PARALLEL_VACUUM_KEY_WAL_USAGE       4
      61              : #define PARALLEL_VACUUM_KEY_INDEX_STATS     5
      62              : 
      63              : /*
      64              :  * Struct for cost-based vacuum delay related parameters to share among an
      65              :  * autovacuum worker and its parallel vacuum workers.
      66              :  */
      67              : typedef struct PVSharedCostParams
      68              : {
      69              :     /*
      70              :      * The generation counter is incremented by the leader process each time
      71              :      * it updates the shared cost-based vacuum delay parameters. Parallel
      72              :      * vacuum workers compare it with their local generation,
      73              :      * shared_params_generation_local, to detect whether they need to refresh
      74              :      * their local parameters. The generation starts from 1 so that a freshly
      75              :      * started worker (whose local copy is 0) will always load the initial
      76              :      * parameters on its first check.
      77              :      */
      78              :     pg_atomic_uint32 generation;
      79              : 
      80              :     slock_t     mutex;          /* protects all fields below */
      81              : 
      82              :     /* Parameters to share with parallel workers */
      83              :     double      cost_delay;
      84              :     int         cost_limit;
      85              :     int         cost_page_dirty;
      86              :     int         cost_page_hit;
      87              :     int         cost_page_miss;
      88              : } PVSharedCostParams;
      89              : 
      90              : /*
      91              :  * Shared information among parallel workers.  So this is allocated in the DSM
      92              :  * segment.
      93              :  */
      94              : typedef struct PVShared
      95              : {
      96              :     /*
      97              :      * Target table relid, log level (for messages about parallel workers
      98              :      * launched during VACUUM VERBOSE) and query ID.  These fields are not
      99              :      * modified during the parallel vacuum.
     100              :      */
     101              :     Oid         relid;
     102              :     int         elevel;
     103              :     int64       queryid;
     104              : 
     105              :     /*
     106              :      * Fields for both index vacuum and cleanup.
     107              :      *
     108              :      * reltuples is the total number of input heap tuples.  We set either old
     109              :      * live tuples in the index vacuum case or the new live tuples in the
     110              :      * index cleanup case.
     111              :      *
     112              :      * estimated_count is true if reltuples is an estimated value.  (Note that
     113              :      * reltuples could be -1 in this case, indicating we have no idea.)
     114              :      */
     115              :     double      reltuples;
     116              :     bool        estimated_count;
     117              : 
     118              :     /*
     119              :      * In single process vacuum we could consume more memory during index
     120              :      * vacuuming or cleanup apart from the memory for heap scanning.  In
     121              :      * parallel vacuum, since individual vacuum workers can consume memory
     122              :      * equal to maintenance_work_mem, the new maintenance_work_mem for each
     123              :      * worker is set such that the parallel operation doesn't consume more
     124              :      * memory than single process vacuum.
     125              :      */
     126              :     int         maintenance_work_mem_worker;
     127              : 
     128              :     /*
     129              :      * The number of buffers each worker's Buffer Access Strategy ring should
     130              :      * contain.
     131              :      */
     132              :     int         ring_nbuffers;
     133              : 
     134              :     /*
     135              :      * Shared vacuum cost balance.  During parallel vacuum,
     136              :      * VacuumSharedCostBalance points to this value and it accumulates the
     137              :      * balance of each parallel vacuum worker.
     138              :      */
     139              :     pg_atomic_uint32 cost_balance;
     140              : 
     141              :     /*
     142              :      * Number of active parallel workers.  This is used for computing the
     143              :      * minimum threshold of the vacuum cost balance before a worker sleeps for
     144              :      * cost-based delay.
     145              :      */
     146              :     pg_atomic_uint32 active_nworkers;
     147              : 
     148              :     /* Counter for vacuuming and cleanup */
     149              :     pg_atomic_uint32 idx;
     150              : 
     151              :     /* DSA handle where the TidStore lives */
     152              :     dsa_handle  dead_items_dsa_handle;
     153              : 
     154              :     /* DSA pointer to the shared TidStore */
     155              :     dsa_pointer dead_items_handle;
     156              : 
     157              :     /* Statistics of shared dead items */
     158              :     VacDeadItemsInfo dead_items_info;
     159              : 
     160              :     /*
     161              :      * If 'true' then we are running parallel autovacuum. Otherwise, we are
     162              :      * running parallel maintenance VACUUM.
     163              :      */
     164              :     bool        is_autovacuum;
     165              : 
     166              :     /*
     167              :      * Cost-based vacuum delay parameters shared between the autovacuum leader
     168              :      * and its parallel workers.
     169              :      */
     170              :     PVSharedCostParams cost_params;
     171              : } PVShared;
     172              : 
     173              : /* Status used during parallel index vacuum or cleanup */
     174              : typedef enum PVIndVacStatus
     175              : {
     176              :     PARALLEL_INDVAC_STATUS_INITIAL = 0,
     177              :     PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     178              :     PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     179              :     PARALLEL_INDVAC_STATUS_COMPLETED,
     180              : } PVIndVacStatus;
     181              : 
     182              : /*
     183              :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     184              :  * This includes the status of parallel index vacuum as well as index statistics.
     185              :  */
     186              : typedef struct PVIndStats
     187              : {
     188              :     /*
     189              :      * The following two fields are set by leader process before executing
     190              :      * parallel index vacuum or parallel index cleanup.  These fields are not
     191              :      * fixed for the entire VACUUM operation.  They are only fixed for an
     192              :      * individual parallel index vacuum and cleanup.
     193              :      *
     194              :      * parallel_workers_can_process is true if both leader and worker can
     195              :      * process the index, otherwise only leader can process it.
     196              :      */
     197              :     PVIndVacStatus status;
     198              :     bool        parallel_workers_can_process;
     199              : 
     200              :     /*
     201              :      * Individual worker or leader stores the result of index vacuum or
     202              :      * cleanup.
     203              :      */
     204              :     bool        istat_updated;  /* are the stats updated? */
     205              :     IndexBulkDeleteResult istat;
     206              : } PVIndStats;
     207              : 
     208              : /*
     209              :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     210              :  */
     211              : struct ParallelVacuumState
     212              : {
     213              :     /* NULL for worker processes */
     214              :     ParallelContext *pcxt;
     215              : 
     216              :     /* Parent Heap Relation */
     217              :     Relation    heaprel;
     218              : 
     219              :     /* Target indexes */
     220              :     Relation   *indrels;
     221              :     int         nindexes;
     222              : 
     223              :     /* Shared information among parallel vacuum workers */
     224              :     PVShared   *shared;
     225              : 
     226              :     /*
     227              :      * Shared index statistics among parallel vacuum workers. The array
     228              :      * element is allocated for every index, even those indexes where parallel
     229              :      * index vacuuming is unsafe or not worthwhile (e.g.,
     230              :      * will_parallel_vacuum[] is false).  During parallel vacuum,
     231              :      * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     232              :      * local memory at the end of parallel vacuum.
     233              :      */
     234              :     PVIndStats *indstats;
     235              : 
     236              :     /* Shared dead items space among parallel vacuum workers */
     237              :     TidStore   *dead_items;
     238              : 
     239              :     /* Points to buffer usage area in DSM */
     240              :     BufferUsage *buffer_usage;
     241              : 
     242              :     /* Points to WAL usage area in DSM */
     243              :     WalUsage   *wal_usage;
     244              : 
     245              :     /*
     246              :      * False if the index is totally unsuitable target for all parallel
     247              :      * processing. For example, the index could be <
     248              :      * min_parallel_index_scan_size cutoff.
     249              :      */
     250              :     bool       *will_parallel_vacuum;
     251              : 
     252              :     /*
     253              :      * The number of indexes that support parallel index bulk-deletion and
     254              :      * parallel index cleanup respectively.
     255              :      */
     256              :     int         nindexes_parallel_bulkdel;
     257              :     int         nindexes_parallel_cleanup;
     258              :     int         nindexes_parallel_condcleanup;
     259              : 
     260              :     /* Buffer access strategy used by leader process */
     261              :     BufferAccessStrategy bstrategy;
     262              : 
     263              :     /*
     264              :      * Error reporting state.  The error callback is set only for workers
     265              :      * processes during parallel index vacuum.
     266              :      */
     267              :     char       *relnamespace;
     268              :     char       *relname;
     269              :     char       *indname;
     270              :     PVIndVacStatus status;
     271              : };
     272              : 
     273              : static PVSharedCostParams *pv_shared_cost_params = NULL;
     274              : 
     275              : /*
     276              :  * Worker-local copy of the last cost-parameter generation this worker has
     277              :  * applied.  Initialized to 0; since the leader initializes the shared
     278              :  * generation counter to 1, the first call to
     279              :  * parallel_vacuum_update_shared_delay_params() will always detect a
     280              :  * mismatch and read the initial parameters from shared memory.
     281              :  */
     282              : static uint32 shared_params_generation_local = 0;
     283              : 
     284              : static int  parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     285              :                                             bool *will_parallel_vacuum);
     286              : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     287              :                                                 bool vacuum, PVWorkerStats *wstats);
     288              : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     289              : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     290              : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     291              :                                               PVIndStats *indstats);
     292              : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     293              :                                                    bool vacuum);
     294              : static void parallel_vacuum_error_callback(void *arg);
     295              : static inline void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params);
     296              : static void parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg);
     297              : 
     298              : /*
     299              :  * Try to enter parallel mode and create a parallel context.  Then initialize
     300              :  * shared memory state.
     301              :  *
     302              :  * On success, return parallel vacuum state.  Otherwise return NULL.
     303              :  */
     304              : ParallelVacuumState *
     305        44754 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     306              :                      int nrequested_workers, int vac_work_mem,
     307              :                      int elevel, BufferAccessStrategy bstrategy)
     308              : {
     309              :     ParallelVacuumState *pvs;
     310              :     ParallelContext *pcxt;
     311              :     PVShared   *shared;
     312              :     TidStore   *dead_items;
     313              :     PVIndStats *indstats;
     314              :     BufferUsage *buffer_usage;
     315              :     WalUsage   *wal_usage;
     316              :     bool       *will_parallel_vacuum;
     317              :     Size        est_indstats_len;
     318              :     Size        est_shared_len;
     319        44754 :     int         nindexes_mwm = 0;
     320        44754 :     int         parallel_workers = 0;
     321              :     int         querylen;
     322              : 
     323              :     /*
     324              :      * A parallel vacuum must be requested and there must be indexes on the
     325              :      * relation
     326              :      */
     327              :     Assert(nrequested_workers >= 0);
     328              :     Assert(nindexes > 0);
     329              : 
     330              :     /*
     331              :      * Compute the number of parallel vacuum workers to launch
     332              :      */
     333        44754 :     will_parallel_vacuum = palloc0_array(bool, nindexes);
     334        44754 :     parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     335              :                                                        nrequested_workers,
     336              :                                                        will_parallel_vacuum);
     337        44754 :     if (parallel_workers <= 0)
     338              :     {
     339              :         /* Can't perform vacuum in parallel -- return NULL */
     340        44728 :         pfree(will_parallel_vacuum);
     341        44728 :         return NULL;
     342              :     }
     343              : 
     344           26 :     pvs = palloc0_object(ParallelVacuumState);
     345           26 :     pvs->indrels = indrels;
     346           26 :     pvs->nindexes = nindexes;
     347           26 :     pvs->will_parallel_vacuum = will_parallel_vacuum;
     348           26 :     pvs->bstrategy = bstrategy;
     349           26 :     pvs->heaprel = rel;
     350              : 
     351           26 :     EnterParallelMode();
     352           26 :     pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     353              :                                  parallel_workers);
     354              :     Assert(pcxt->nworkers > 0);
     355           26 :     pvs->pcxt = pcxt;
     356              : 
     357              :     /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     358           26 :     est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     359           26 :     shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     360           26 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     361              : 
     362              :     /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     363           26 :     est_shared_len = sizeof(PVShared);
     364           26 :     shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     365           26 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     366              : 
     367              :     /*
     368              :      * Estimate space for BufferUsage and WalUsage --
     369              :      * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     370              :      *
     371              :      * If there are no extensions loaded that care, we could skip this.  We
     372              :      * have no way of knowing whether anyone's looking at pgBufferUsage or
     373              :      * pgWalUsage, so do it unconditionally.
     374              :      */
     375           26 :     shm_toc_estimate_chunk(&pcxt->estimator,
     376              :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     377           26 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     378           26 :     shm_toc_estimate_chunk(&pcxt->estimator,
     379              :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     380           26 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     381              : 
     382              :     /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     383           26 :     if (debug_query_string)
     384              :     {
     385           23 :         querylen = strlen(debug_query_string);
     386           23 :         shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     387           23 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     388              :     }
     389              :     else
     390            3 :         querylen = 0;           /* keep compiler quiet */
     391              : 
     392           26 :     InitializeParallelDSM(pcxt);
     393              : 
     394              :     /* Prepare index vacuum stats */
     395           26 :     indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     396          566 :     MemSet(indstats, 0, est_indstats_len);
     397          116 :     for (int i = 0; i < nindexes; i++)
     398              :     {
     399           90 :         Relation    indrel = indrels[i];
     400           90 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     401              : 
     402              :         /*
     403              :          * Cleanup option should be either disabled, always performing in
     404              :          * parallel or conditionally performing in parallel.
     405              :          */
     406              :         Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     407              :                ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     408              :         Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     409              : 
     410           90 :         if (!will_parallel_vacuum[i])
     411            4 :             continue;
     412              : 
     413           86 :         if (indrel->rd_indam->amusemaintenanceworkmem)
     414            8 :             nindexes_mwm++;
     415              : 
     416              :         /*
     417              :          * Remember the number of indexes that support parallel operation for
     418              :          * each phase.
     419              :          */
     420           86 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     421           78 :             pvs->nindexes_parallel_bulkdel++;
     422           86 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     423           16 :             pvs->nindexes_parallel_cleanup++;
     424           86 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     425           62 :             pvs->nindexes_parallel_condcleanup++;
     426              :     }
     427           26 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     428           26 :     pvs->indstats = indstats;
     429              : 
     430              :     /* Prepare shared information */
     431           26 :     shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     432          416 :     MemSet(shared, 0, est_shared_len);
     433           26 :     shared->relid = RelationGetRelid(rel);
     434           26 :     shared->elevel = elevel;
     435           26 :     shared->queryid = pgstat_get_my_query_id();
     436           26 :     shared->maintenance_work_mem_worker =
     437              :         (nindexes_mwm > 0) ?
     438           26 :         vac_work_mem / Min(parallel_workers, nindexes_mwm) :
     439              :         vac_work_mem;
     440              : 
     441           26 :     shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
     442              : 
     443              :     /* Prepare DSA space for dead items */
     444           26 :     dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
     445              :                                       LWTRANCHE_PARALLEL_VACUUM_DSA);
     446           26 :     pvs->dead_items = dead_items;
     447           26 :     shared->dead_items_handle = TidStoreGetHandle(dead_items);
     448           26 :     shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
     449              : 
     450              :     /* Use the same buffer size for all workers */
     451           26 :     shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     452              : 
     453           26 :     pg_atomic_init_u32(&(shared->cost_balance), 0);
     454           26 :     pg_atomic_init_u32(&(shared->active_nworkers), 0);
     455           26 :     pg_atomic_init_u32(&(shared->idx), 0);
     456              : 
     457           26 :     shared->is_autovacuum = AmAutoVacuumWorkerProcess();
     458              : 
     459              :     /*
     460              :      * Initialize shared cost-based vacuum delay parameters if it's for
     461              :      * autovacuum.
     462              :      */
     463           26 :     if (shared->is_autovacuum)
     464              :     {
     465            3 :         parallel_vacuum_set_cost_parameters(&shared->cost_params);
     466            3 :         pg_atomic_init_u32(&shared->cost_params.generation, 1);
     467            3 :         SpinLockInit(&shared->cost_params.mutex);
     468              : 
     469            3 :         pv_shared_cost_params = &(shared->cost_params);
     470            3 :         on_dsm_detach(pcxt->seg, parallel_vacuum_dsm_detach, (Datum) 0);
     471              :     }
     472              : 
     473           26 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     474           26 :     pvs->shared = shared;
     475              : 
     476              :     /*
     477              :      * Allocate space for each worker's BufferUsage and WalUsage; no need to
     478              :      * initialize
     479              :      */
     480           26 :     buffer_usage = shm_toc_allocate(pcxt->toc,
     481           26 :                                     mul_size(sizeof(BufferUsage), pcxt->nworkers));
     482           26 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     483           26 :     pvs->buffer_usage = buffer_usage;
     484           26 :     wal_usage = shm_toc_allocate(pcxt->toc,
     485           26 :                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     486           26 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     487           26 :     pvs->wal_usage = wal_usage;
     488              : 
     489              :     /* Store query string for workers */
     490           26 :     if (debug_query_string)
     491              :     {
     492              :         char       *sharedquery;
     493              : 
     494           23 :         sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     495           23 :         memcpy(sharedquery, debug_query_string, querylen + 1);
     496           23 :         sharedquery[querylen] = '\0';
     497           23 :         shm_toc_insert(pcxt->toc,
     498              :                        PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     499              :     }
     500              : 
     501              :     /* Success -- return parallel vacuum state */
     502           26 :     return pvs;
     503              : }
     504              : 
     505              : /*
     506              :  * Destroy the parallel context, and end parallel mode.
     507              :  *
     508              :  * Since writes are not allowed during parallel mode, copy the
     509              :  * updated index statistics from DSM into local memory and then later use that
     510              :  * to update the index statistics.  One might think that we can exit from
     511              :  * parallel mode, update the index statistics and then destroy parallel
     512              :  * context, but that won't be safe (see ExitParallelMode).
     513              :  */
     514              : void
     515           26 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     516              : {
     517              :     Assert(!IsParallelWorker());
     518              : 
     519              :     /* Copy the updated statistics */
     520          116 :     for (int i = 0; i < pvs->nindexes; i++)
     521              :     {
     522           90 :         PVIndStats *indstats = &(pvs->indstats[i]);
     523              : 
     524           90 :         if (indstats->istat_updated)
     525              :         {
     526           62 :             istats[i] = palloc0_object(IndexBulkDeleteResult);
     527           62 :             memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     528              :         }
     529              :         else
     530           28 :             istats[i] = NULL;
     531              :     }
     532              : 
     533           26 :     TidStoreDestroy(pvs->dead_items);
     534              : 
     535           26 :     DestroyParallelContext(pvs->pcxt);
     536           26 :     ExitParallelMode();
     537              : 
     538           26 :     if (AmAutoVacuumWorkerProcess())
     539            3 :         pv_shared_cost_params = NULL;
     540              : 
     541           26 :     pfree(pvs->will_parallel_vacuum);
     542           26 :     pfree(pvs);
     543           26 : }
     544              : 
     545              : /*
     546              :  * DSM detach callback. This is invoked when an autovacuum worker detaches
     547              :  * from the DSM segment holding PVShared. It ensures to reset the local pointer
     548              :  * to the shared state even if parallel vacuum raises an error and doesn't
     549              :  * call parallel_vacuum_end().
     550              :  */
     551              : static void
     552            3 : parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg)
     553              : {
     554              :     Assert(AmAutoVacuumWorkerProcess());
     555            3 :     pv_shared_cost_params = NULL;
     556            3 : }
     557              : 
     558              : /*
     559              :  * Returns the dead items space and dead items information.
     560              :  */
     561              : TidStore *
     562           50 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
     563              : {
     564           50 :     *dead_items_info_p = &(pvs->shared->dead_items_info);
     565           50 :     return pvs->dead_items;
     566              : }
     567              : 
     568              : /* Forget all items in dead_items */
     569              : void
     570           24 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
     571              : {
     572           24 :     VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
     573              : 
     574              :     /*
     575              :      * Free the current tidstore and return allocated DSA segments to the
     576              :      * operating system. Then we recreate the tidstore with the same max_bytes
     577              :      * limitation we just used.
     578              :      */
     579           24 :     TidStoreDestroy(pvs->dead_items);
     580           24 :     pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
     581              :                                            LWTRANCHE_PARALLEL_VACUUM_DSA);
     582              : 
     583              :     /* Update the DSA pointer for dead_items to the new one */
     584           24 :     pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
     585           24 :     pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
     586              : 
     587              :     /* Reset the counter */
     588           24 :     dead_items_info->num_items = 0;
     589           24 : }
     590              : 
     591              : /*
     592              :  * Do parallel index bulk-deletion with parallel workers.
     593              :  */
     594              : void
     595           24 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     596              :                                     int num_index_scans, PVWorkerStats *wstats)
     597              : {
     598              :     Assert(!IsParallelWorker());
     599              : 
     600              :     /*
     601              :      * We can only provide an approximate value of num_heap_tuples, at least
     602              :      * for now.
     603              :      */
     604           24 :     pvs->shared->reltuples = num_table_tuples;
     605           24 :     pvs->shared->estimated_count = true;
     606              : 
     607           24 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
     608           24 : }
     609              : 
     610              : /*
     611              :  * Do parallel index cleanup with parallel workers.
     612              :  */
     613              : void
     614           26 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     615              :                                     int num_index_scans, bool estimated_count,
     616              :                                     PVWorkerStats *wstats)
     617              : {
     618              :     Assert(!IsParallelWorker());
     619              : 
     620              :     /*
     621              :      * We can provide a better estimate of total number of surviving tuples
     622              :      * (we assume indexes are more interested in that than in the number of
     623              :      * nominally live tuples).
     624              :      */
     625           26 :     pvs->shared->reltuples = num_table_tuples;
     626           26 :     pvs->shared->estimated_count = estimated_count;
     627              : 
     628           26 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
     629           26 : }
     630              : 
     631              : /*
     632              :  * Fill in the given structure with cost-based vacuum delay parameter values.
     633              :  */
     634              : static inline void
     635            4 : parallel_vacuum_set_cost_parameters(PVSharedCostParams *params)
     636              : {
     637            4 :     params->cost_delay = vacuum_cost_delay;
     638            4 :     params->cost_limit = vacuum_cost_limit;
     639            4 :     params->cost_page_dirty = VacuumCostPageDirty;
     640            4 :     params->cost_page_hit = VacuumCostPageHit;
     641            4 :     params->cost_page_miss = VacuumCostPageMiss;
     642            4 : }
     643              : 
     644              : /*
     645              :  * Updates the cost-based vacuum delay parameters for parallel autovacuum
     646              :  * workers.
     647              :  *
     648              :  * For non-autovacuum parallel workers, this function will have no effect.
     649              :  */
     650              : void
     651          205 : parallel_vacuum_update_shared_delay_params(void)
     652              : {
     653              :     uint32      params_generation;
     654              : 
     655              :     Assert(IsParallelWorker());
     656              : 
     657              :     /* Quick return if the worker is not running for the autovacuum */
     658          205 :     if (pv_shared_cost_params == NULL)
     659           48 :         return;
     660              : 
     661          157 :     params_generation = pg_atomic_read_u32(&pv_shared_cost_params->generation);
     662              :     Assert(shared_params_generation_local <= params_generation);
     663              : 
     664              :     /* Return if parameters had not changed in the leader */
     665          157 :     if (params_generation == shared_params_generation_local)
     666          153 :         return;
     667              : 
     668            4 :     SpinLockAcquire(&pv_shared_cost_params->mutex);
     669            4 :     VacuumCostDelay = pv_shared_cost_params->cost_delay;
     670            4 :     VacuumCostLimit = pv_shared_cost_params->cost_limit;
     671            4 :     VacuumCostPageDirty = pv_shared_cost_params->cost_page_dirty;
     672            4 :     VacuumCostPageHit = pv_shared_cost_params->cost_page_hit;
     673            4 :     VacuumCostPageMiss = pv_shared_cost_params->cost_page_miss;
     674            4 :     SpinLockRelease(&pv_shared_cost_params->mutex);
     675              : 
     676            4 :     VacuumUpdateCosts();
     677              : 
     678            4 :     shared_params_generation_local = params_generation;
     679              : 
     680            4 :     elog(DEBUG2,
     681              :          "parallel autovacuum worker updated cost params: cost_limit=%d, cost_delay=%g, cost_page_miss=%d, cost_page_dirty=%d, cost_page_hit=%d",
     682              :          vacuum_cost_limit,
     683              :          vacuum_cost_delay,
     684              :          VacuumCostPageMiss,
     685              :          VacuumCostPageDirty,
     686              :          VacuumCostPageHit);
     687              : }
     688              : 
     689              : /*
     690              :  * Store the cost-based vacuum delay parameters in the shared memory so that
     691              :  * parallel vacuum workers can consume them (see
     692              :  * parallel_vacuum_update_shared_delay_params()).
     693              :  */
     694              : void
     695            1 : parallel_vacuum_propagate_shared_delay_params(void)
     696              : {
     697              :     Assert(AmAutoVacuumWorkerProcess());
     698              : 
     699              :     /*
     700              :      * Quick return if the leader process is not sharing the delay parameters.
     701              :      */
     702            1 :     if (pv_shared_cost_params == NULL)
     703            0 :         return;
     704              : 
     705              :     /*
     706              :      * Check if any delay parameters have changed. We can read them without
     707              :      * locks as only the leader can modify them.
     708              :      */
     709            1 :     if (vacuum_cost_delay == pv_shared_cost_params->cost_delay &&
     710            0 :         vacuum_cost_limit == pv_shared_cost_params->cost_limit &&
     711            0 :         VacuumCostPageDirty == pv_shared_cost_params->cost_page_dirty &&
     712            0 :         VacuumCostPageHit == pv_shared_cost_params->cost_page_hit &&
     713            0 :         VacuumCostPageMiss == pv_shared_cost_params->cost_page_miss)
     714            0 :         return;
     715              : 
     716              :     /* Update the shared delay parameters */
     717            1 :     SpinLockAcquire(&pv_shared_cost_params->mutex);
     718            1 :     parallel_vacuum_set_cost_parameters(pv_shared_cost_params);
     719            1 :     SpinLockRelease(&pv_shared_cost_params->mutex);
     720              : 
     721              :     /*
     722              :      * Increment the generation of the parameters, i.e. let parallel workers
     723              :      * know that they should re-read shared cost params.
     724              :      */
     725            1 :     pg_atomic_fetch_add_u32(&pv_shared_cost_params->generation, 1);
     726              : }
     727              : 
     728              : /*
     729              :  * Compute the number of parallel worker processes to request.  Both index
     730              :  * vacuum and index cleanup can be executed with parallel workers.
     731              :  * The index is eligible for parallel vacuum iff its size is greater than
     732              :  * min_parallel_index_scan_size as invoking workers for very small indexes
     733              :  * can hurt performance.
     734              :  *
     735              :  * nrequested is the number of parallel workers that user requested.  If
     736              :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     737              :  * the number of indexes that support parallel vacuum.  This function also
     738              :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     739              :  * vacuum.
     740              :  */
     741              : static int
     742        44754 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     743              :                                 bool *will_parallel_vacuum)
     744              : {
     745        44754 :     int         nindexes_parallel = 0;
     746        44754 :     int         nindexes_parallel_bulkdel = 0;
     747        44754 :     int         nindexes_parallel_cleanup = 0;
     748              :     int         parallel_workers;
     749              :     int         max_workers;
     750              : 
     751        89508 :     max_workers = AmAutoVacuumWorkerProcess() ?
     752        44754 :         autovacuum_max_parallel_workers :
     753              :         max_parallel_maintenance_workers;
     754              : 
     755              :     /*
     756              :      * We don't allow performing parallel operation in standalone backend or
     757              :      * when parallelism is disabled.
     758              :      */
     759        44754 :     if (!IsUnderPostmaster || max_workers == 0)
     760        40959 :         return 0;
     761              : 
     762              :     /*
     763              :      * Compute the number of indexes that can participate in parallel vacuum.
     764              :      */
     765        12361 :     for (int i = 0; i < nindexes; i++)
     766              :     {
     767         8566 :         Relation    indrel = indrels[i];
     768         8566 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     769              : 
     770              :         /* Skip index that is not a suitable target for parallel index vacuum */
     771         8566 :         if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     772         8566 :             RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     773         8468 :             continue;
     774              : 
     775           98 :         will_parallel_vacuum[i] = true;
     776              : 
     777           98 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     778           90 :             nindexes_parallel_bulkdel++;
     779           98 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     780           82 :             ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     781           90 :             nindexes_parallel_cleanup++;
     782              :     }
     783              : 
     784         3795 :     nindexes_parallel = Max(nindexes_parallel_bulkdel,
     785              :                             nindexes_parallel_cleanup);
     786              : 
     787              :     /* The leader process takes one index */
     788         3795 :     nindexes_parallel--;
     789              : 
     790              :     /* No index supports parallel vacuum */
     791         3795 :     if (nindexes_parallel <= 0)
     792         3769 :         return 0;
     793              : 
     794              :     /* Compute the parallel degree */
     795           26 :     parallel_workers = (nrequested > 0) ?
     796           26 :         Min(nrequested, nindexes_parallel) : nindexes_parallel;
     797              : 
     798              :     /* Cap by GUC variable */
     799           26 :     parallel_workers = Min(parallel_workers, max_workers);
     800              : 
     801           26 :     return parallel_workers;
     802              : }
     803              : 
     804              : /*
     805              :  * Perform index vacuum or index cleanup with parallel workers.  This function
     806              :  * must be used by the parallel vacuum leader process.
     807              :  *
     808              :  * If wstats is not NULL, the parallel worker statistics are updated.
     809              :  */
     810              : static void
     811           50 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     812              :                                     bool vacuum, PVWorkerStats *wstats)
     813              : {
     814              :     int         nworkers;
     815              :     PVIndVacStatus new_status;
     816              : 
     817              :     Assert(!IsParallelWorker());
     818              : 
     819           50 :     if (vacuum)
     820              :     {
     821           24 :         new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     822              : 
     823              :         /* Determine the number of parallel workers to launch */
     824           24 :         nworkers = pvs->nindexes_parallel_bulkdel;
     825              :     }
     826              :     else
     827              :     {
     828           26 :         new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     829              : 
     830              :         /* Determine the number of parallel workers to launch */
     831           26 :         nworkers = pvs->nindexes_parallel_cleanup;
     832              : 
     833              :         /* Add conditionally parallel-aware indexes if in the first time call */
     834           26 :         if (num_index_scans == 0)
     835           14 :             nworkers += pvs->nindexes_parallel_condcleanup;
     836              :     }
     837              : 
     838              :     /* The leader process will participate */
     839           50 :     nworkers--;
     840              : 
     841              :     /*
     842              :      * It is possible that parallel context is initialized with fewer workers
     843              :      * than the number of indexes that need a separate worker in the current
     844              :      * phase, so we need to consider it.  See
     845              :      * parallel_vacuum_compute_workers().
     846              :      */
     847           50 :     nworkers = Min(nworkers, pvs->pcxt->nworkers);
     848              : 
     849              :     /* Update the statistics, if we asked to */
     850           50 :     if (wstats != NULL && nworkers > 0)
     851           40 :         wstats->nplanned += nworkers;
     852              : 
     853              :     /*
     854              :      * Set index vacuum status and mark whether parallel vacuum worker can
     855              :      * process it.
     856              :      */
     857          202 :     for (int i = 0; i < pvs->nindexes; i++)
     858              :     {
     859          152 :         PVIndStats *indstats = &(pvs->indstats[i]);
     860              : 
     861              :         Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     862          152 :         indstats->status = new_status;
     863          152 :         indstats->parallel_workers_can_process =
     864          296 :             (pvs->will_parallel_vacuum[i] &&
     865          144 :              parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     866              :                                                     num_index_scans,
     867          152 :                                                     vacuum));
     868              :     }
     869              : 
     870              :     /* Reset the parallel index processing and progress counters */
     871           50 :     pg_atomic_write_u32(&(pvs->shared->idx), 0);
     872              : 
     873              :     /* Setup the shared cost-based vacuum delay and launch workers */
     874           50 :     if (nworkers > 0)
     875              :     {
     876              :         /* Reinitialize parallel context to relaunch parallel workers */
     877           40 :         if (num_index_scans > 0)
     878           14 :             ReinitializeParallelDSM(pvs->pcxt);
     879              : 
     880              :         /*
     881              :          * Set up shared cost balance and the number of active workers for
     882              :          * vacuum delay.  We need to do this before launching workers as
     883              :          * otherwise, they might not see the updated values for these
     884              :          * parameters.
     885              :          */
     886           40 :         pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     887           40 :         pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     888              : 
     889              :         /*
     890              :          * The number of workers can vary between bulkdelete and cleanup
     891              :          * phase.
     892              :          */
     893           40 :         ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     894              : 
     895           40 :         LaunchParallelWorkers(pvs->pcxt);
     896              : 
     897           40 :         if (pvs->pcxt->nworkers_launched > 0)
     898              :         {
     899              :             /*
     900              :              * Reset the local cost values for leader backend as we have
     901              :              * already accumulated the remaining balance of heap.
     902              :              */
     903           40 :             VacuumCostBalance = 0;
     904           40 :             VacuumCostBalanceLocal = 0;
     905              : 
     906              :             /* Enable shared cost balance for leader backend */
     907           40 :             VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     908           40 :             VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     909              : 
     910              :             /* Update the statistics, if we asked to */
     911           40 :             if (wstats != NULL)
     912           40 :                 wstats->nlaunched += pvs->pcxt->nworkers_launched;
     913              :         }
     914              : 
     915           40 :         if (vacuum)
     916           24 :             ereport(pvs->shared->elevel,
     917              :                     (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     918              :                                      "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     919              :                                      pvs->pcxt->nworkers_launched),
     920              :                             pvs->pcxt->nworkers_launched, nworkers)));
     921              :         else
     922           16 :             ereport(pvs->shared->elevel,
     923              :                     (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     924              :                                      "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     925              :                                      pvs->pcxt->nworkers_launched),
     926              :                             pvs->pcxt->nworkers_launched, nworkers)));
     927              :     }
     928              : 
     929              :     /* Vacuum the indexes that can be processed by only leader process */
     930           50 :     parallel_vacuum_process_unsafe_indexes(pvs);
     931              : 
     932              :     /*
     933              :      * Join as a parallel worker.  The leader vacuums alone processes all
     934              :      * parallel-safe indexes in the case where no workers are launched.
     935              :      */
     936           50 :     parallel_vacuum_process_safe_indexes(pvs);
     937              : 
     938              :     /*
     939              :      * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     940              :      * to finish, or we might get incomplete data.)
     941              :      */
     942           50 :     if (nworkers > 0)
     943              :     {
     944              :         /* Wait for all vacuum workers to finish */
     945           40 :         WaitForParallelWorkersToFinish(pvs->pcxt);
     946              : 
     947           89 :         for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     948           49 :             InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     949              :     }
     950              : 
     951              :     /*
     952              :      * Reset all index status back to initial (while checking that we have
     953              :      * vacuumed all indexes).
     954              :      */
     955          202 :     for (int i = 0; i < pvs->nindexes; i++)
     956              :     {
     957          152 :         PVIndStats *indstats = &(pvs->indstats[i]);
     958              : 
     959          152 :         if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     960            0 :             elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     961              :                  RelationGetRelationName(pvs->indrels[i]));
     962              : 
     963          152 :         indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     964              :     }
     965              : 
     966              :     /*
     967              :      * Carry the shared balance value to heap scan and disable shared costing
     968              :      */
     969           50 :     if (VacuumSharedCostBalance)
     970              :     {
     971           40 :         VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     972           40 :         VacuumSharedCostBalance = NULL;
     973           40 :         VacuumActiveNWorkers = NULL;
     974              :     }
     975           50 : }
     976              : 
     977              : /*
     978              :  * Index vacuum/cleanup routine used by the leader process and parallel
     979              :  * vacuum worker processes to vacuum the indexes in parallel.
     980              :  */
     981              : static void
     982           99 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     983              : {
     984              :     /*
     985              :      * Increment the active worker count if we are able to launch any worker.
     986              :      */
     987           99 :     if (VacuumActiveNWorkers)
     988           89 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     989              : 
     990              :     /* Loop until all indexes are vacuumed */
     991              :     for (;;)
     992          152 :     {
     993              :         int         idx;
     994              :         PVIndStats *indstats;
     995              : 
     996              :         /* Get an index number to process */
     997          251 :         idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     998              : 
     999              :         /* Done for all indexes? */
    1000          251 :         if (idx >= pvs->nindexes)
    1001           99 :             break;
    1002              : 
    1003          152 :         indstats = &(pvs->indstats[idx]);
    1004              : 
    1005              :         /*
    1006              :          * Skip vacuuming index that is unsafe for workers or has an
    1007              :          * unsuitable target for parallel index vacuum (this is vacuumed in
    1008              :          * parallel_vacuum_process_unsafe_indexes() by the leader).
    1009              :          */
    1010          152 :         if (!indstats->parallel_workers_can_process)
    1011           46 :             continue;
    1012              : 
    1013              :         /* Do vacuum or cleanup of the index */
    1014          106 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
    1015              :     }
    1016              : 
    1017              :     /*
    1018              :      * We have completed the index vacuum so decrement the active worker
    1019              :      * count.
    1020              :      */
    1021           99 :     if (VacuumActiveNWorkers)
    1022           89 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
    1023           99 : }
    1024              : 
    1025              : /*
    1026              :  * Perform parallel vacuuming of indexes in leader process.
    1027              :  *
    1028              :  * Handles index vacuuming (or index cleanup) for indexes that are not
    1029              :  * parallel safe.  It's possible that this will vary for a given index, based
    1030              :  * on details like whether we're performing index cleanup right now.
    1031              :  *
    1032              :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
    1033              :  * enforced by parallel_vacuum_compute_workers().
    1034              :  */
    1035              : static void
    1036           50 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
    1037              : {
    1038              :     Assert(!IsParallelWorker());
    1039              : 
    1040              :     /*
    1041              :      * Increment the active worker count if we are able to launch any worker.
    1042              :      */
    1043           50 :     if (VacuumActiveNWorkers)
    1044           40 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
    1045              : 
    1046          202 :     for (int i = 0; i < pvs->nindexes; i++)
    1047              :     {
    1048          152 :         PVIndStats *indstats = &(pvs->indstats[i]);
    1049              : 
    1050              :         /* Skip, indexes that are safe for workers */
    1051          152 :         if (indstats->parallel_workers_can_process)
    1052          106 :             continue;
    1053              : 
    1054              :         /* Do vacuum or cleanup of the index */
    1055           46 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
    1056              :     }
    1057              : 
    1058              :     /*
    1059              :      * We have completed the index vacuum so decrement the active worker
    1060              :      * count.
    1061              :      */
    1062           50 :     if (VacuumActiveNWorkers)
    1063           40 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
    1064           50 : }
    1065              : 
    1066              : /*
    1067              :  * Vacuum or cleanup index either by leader process or by one of the worker
    1068              :  * process.  After vacuuming the index this function copies the index
    1069              :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
    1070              :  * segment.
    1071              :  */
    1072              : static void
    1073          152 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
    1074              :                                   PVIndStats *indstats)
    1075              : {
    1076          152 :     IndexBulkDeleteResult *istat = NULL;
    1077              :     IndexBulkDeleteResult *istat_res;
    1078              :     IndexVacuumInfo ivinfo;
    1079              : 
    1080              :     /*
    1081              :      * Update the pointer to the corresponding bulk-deletion result if someone
    1082              :      * has already updated it
    1083              :      */
    1084          152 :     if (indstats->istat_updated)
    1085           62 :         istat = &(indstats->istat);
    1086              : 
    1087          152 :     ivinfo.index = indrel;
    1088          152 :     ivinfo.heaprel = pvs->heaprel;
    1089          152 :     ivinfo.analyze_only = false;
    1090          152 :     ivinfo.report_progress = false;
    1091          152 :     ivinfo.message_level = DEBUG2;
    1092          152 :     ivinfo.estimated_count = pvs->shared->estimated_count;
    1093          152 :     ivinfo.num_heap_tuples = pvs->shared->reltuples;
    1094          152 :     ivinfo.strategy = pvs->bstrategy;
    1095              : 
    1096              :     /* Update error traceback information */
    1097          152 :     pvs->indname = pstrdup(RelationGetRelationName(indrel));
    1098          152 :     pvs->status = indstats->status;
    1099              : 
    1100          152 :     switch (indstats->status)
    1101              :     {
    1102           62 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1103           62 :             istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
    1104           62 :                                               &pvs->shared->dead_items_info);
    1105           62 :             break;
    1106           90 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1107           90 :             istat_res = vac_cleanup_one_index(&ivinfo, istat);
    1108           90 :             break;
    1109            0 :         default:
    1110            0 :             elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
    1111              :                  indstats->status,
    1112              :                  RelationGetRelationName(indrel));
    1113              :     }
    1114              : 
    1115              :     /*
    1116              :      * Copy the index bulk-deletion result returned from ambulkdelete and
    1117              :      * amvacuumcleanup to the DSM segment if it's the first cycle because they
    1118              :      * allocate locally and it's possible that an index will be vacuumed by a
    1119              :      * different vacuum process the next cycle.  Copying the result normally
    1120              :      * happens only the first time an index is vacuumed.  For any additional
    1121              :      * vacuum pass, we directly point to the result on the DSM segment and
    1122              :      * pass it to vacuum index APIs so that workers can update it directly.
    1123              :      *
    1124              :      * Since all vacuum workers write the bulk-deletion result at different
    1125              :      * slots we can write them without locking.
    1126              :      */
    1127          152 :     if (!indstats->istat_updated && istat_res != NULL)
    1128              :     {
    1129           62 :         memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
    1130           62 :         indstats->istat_updated = true;
    1131              : 
    1132              :         /* Free the locally-allocated bulk-deletion result */
    1133           62 :         pfree(istat_res);
    1134              :     }
    1135              : 
    1136              :     /*
    1137              :      * Update the status to completed. No need to lock here since each worker
    1138              :      * touches different indexes.
    1139              :      */
    1140          152 :     indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
    1141              : 
    1142              :     /* Reset error traceback information */
    1143          152 :     pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
    1144          152 :     pfree(pvs->indname);
    1145          152 :     pvs->indname = NULL;
    1146              : 
    1147              :     /*
    1148              :      * Call the parallel variant of pgstat_progress_incr_param so workers can
    1149              :      * report progress of index vacuum to the leader.
    1150              :      */
    1151          152 :     pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
    1152          152 : }
    1153              : 
    1154              : /*
    1155              :  * Returns false, if the given index can't participate in the next execution of
    1156              :  * parallel index vacuum or parallel index cleanup.
    1157              :  */
    1158              : static bool
    1159          144 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
    1160              :                                        bool vacuum)
    1161              : {
    1162              :     uint8       vacoptions;
    1163              : 
    1164          144 :     vacoptions = indrel->rd_indam->amparallelvacuumoptions;
    1165              : 
    1166              :     /* In parallel vacuum case, check if it supports parallel bulk-deletion */
    1167          144 :     if (vacuum)
    1168           58 :         return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
    1169              : 
    1170              :     /* Not safe, if the index does not support parallel cleanup */
    1171           86 :     if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
    1172           70 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
    1173            8 :         return false;
    1174              : 
    1175              :     /*
    1176              :      * Not safe, if the index supports parallel cleanup conditionally, but we
    1177              :      * have already processed the index (for bulkdelete).  We do this to avoid
    1178              :      * the need to invoke workers when parallel index cleanup doesn't need to
    1179              :      * scan the index.  See the comments for option
    1180              :      * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
    1181              :      * parallel cleanup conditionally.
    1182              :      */
    1183           78 :     if (num_index_scans > 0 &&
    1184           32 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
    1185           28 :         return false;
    1186              : 
    1187           50 :     return true;
    1188              : }
    1189              : 
    1190              : /*
    1191              :  * Perform work within a launched parallel process.
    1192              :  *
    1193              :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
    1194              :  * we don't need to report progress information.
    1195              :  */
    1196              : void
    1197           49 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
    1198              : {
    1199              :     ParallelVacuumState pvs;
    1200              :     Relation    rel;
    1201              :     Relation   *indrels;
    1202              :     PVIndStats *indstats;
    1203              :     PVShared   *shared;
    1204              :     TidStore   *dead_items;
    1205              :     BufferUsage *buffer_usage;
    1206              :     WalUsage   *wal_usage;
    1207              :     int         nindexes;
    1208              :     char       *sharedquery;
    1209              :     ErrorContextCallback errcallback;
    1210              : 
    1211              :     /*
    1212              :      * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
    1213              :      * don't support parallel vacuum for autovacuum as of now.
    1214              :      */
    1215              :     Assert(MyProc->statusFlags == PROC_IN_VACUUM);
    1216              : 
    1217           49 :     elog(DEBUG1, "starting parallel vacuum worker");
    1218              : 
    1219           49 :     shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
    1220              : 
    1221              :     /* Set debug_query_string for individual workers */
    1222           49 :     sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
    1223           49 :     debug_query_string = sharedquery;
    1224           49 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1225              : 
    1226              :     /* Track query ID */
    1227           49 :     pgstat_report_query_id(shared->queryid, false);
    1228              : 
    1229              :     /*
    1230              :      * Open table.  The lock mode is the same as the leader process.  It's
    1231              :      * okay because the lock mode does not conflict among the parallel
    1232              :      * workers.
    1233              :      */
    1234           49 :     rel = table_open(shared->relid, ShareUpdateExclusiveLock);
    1235              : 
    1236              :     /*
    1237              :      * Open all indexes. indrels are sorted in order by OID, which should be
    1238              :      * matched to the leader's one.
    1239              :      */
    1240           49 :     vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
    1241              :     Assert(nindexes > 0);
    1242              : 
    1243              :     /*
    1244              :      * Apply the desired value of maintenance_work_mem within this process.
    1245              :      * Really we should use SetConfigOption() to change a GUC, but since we're
    1246              :      * already in parallel mode guc.c would complain about that.  Fortunately,
    1247              :      * by the same token guc.c will not let any user-defined code change it.
    1248              :      * So just avert your eyes while we do this:
    1249              :      */
    1250           49 :     if (shared->maintenance_work_mem_worker > 0)
    1251           49 :         maintenance_work_mem = shared->maintenance_work_mem_worker;
    1252              : 
    1253              :     /* Set index statistics */
    1254           49 :     indstats = (PVIndStats *) shm_toc_lookup(toc,
    1255              :                                              PARALLEL_VACUUM_KEY_INDEX_STATS,
    1256              :                                              false);
    1257              : 
    1258              :     /* Find dead_items in shared memory */
    1259           49 :     dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
    1260              :                                 shared->dead_items_handle);
    1261              : 
    1262              :     /* Set cost-based vacuum delay */
    1263           49 :     if (shared->is_autovacuum)
    1264              :     {
    1265              :         /*
    1266              :          * Parallel autovacuum workers initialize cost-based delay parameters
    1267              :          * from the leader's shared state rather than GUC defaults, because
    1268              :          * the leader may have applied per-table or autovacuum-specific
    1269              :          * overrides. pv_shared_cost_params must be set before calling
    1270              :          * parallel_vacuum_update_shared_delay_params().
    1271              :          */
    1272            4 :         pv_shared_cost_params = &(shared->cost_params);
    1273            4 :         parallel_vacuum_update_shared_delay_params();
    1274              :     }
    1275              :     else
    1276           45 :         VacuumUpdateCosts();
    1277              : 
    1278           49 :     VacuumCostBalance = 0;
    1279           49 :     VacuumCostBalanceLocal = 0;
    1280           49 :     VacuumSharedCostBalance = &(shared->cost_balance);
    1281           49 :     VacuumActiveNWorkers = &(shared->active_nworkers);
    1282              : 
    1283              :     /* Set parallel vacuum state */
    1284           49 :     pvs.indrels = indrels;
    1285           49 :     pvs.nindexes = nindexes;
    1286           49 :     pvs.indstats = indstats;
    1287           49 :     pvs.shared = shared;
    1288           49 :     pvs.dead_items = dead_items;
    1289           49 :     pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1290           49 :     pvs.relname = pstrdup(RelationGetRelationName(rel));
    1291           49 :     pvs.heaprel = rel;
    1292              : 
    1293              :     /* These fields will be filled during index vacuum or cleanup */
    1294           49 :     pvs.indname = NULL;
    1295           49 :     pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1296              : 
    1297              :     /* Each parallel VACUUM worker gets its own access strategy. */
    1298           98 :     pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1299           49 :                                               shared->ring_nbuffers * (BLCKSZ / 1024));
    1300              : 
    1301              :     /* Setup error traceback support for ereport() */
    1302           49 :     errcallback.callback = parallel_vacuum_error_callback;
    1303           49 :     errcallback.arg = &pvs;
    1304           49 :     errcallback.previous = error_context_stack;
    1305           49 :     error_context_stack = &errcallback;
    1306              : 
    1307              :     /* Prepare to track buffer usage during parallel execution */
    1308           49 :     InstrStartParallelQuery();
    1309              : 
    1310              :     /* Process indexes to perform vacuum/cleanup */
    1311           49 :     parallel_vacuum_process_safe_indexes(&pvs);
    1312              : 
    1313              :     /* Report buffer/WAL usage during parallel execution */
    1314           49 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1315           49 :     wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1316           49 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1317           49 :                           &wal_usage[ParallelWorkerNumber]);
    1318              : 
    1319              :     /* Report any remaining cost-based vacuum delay time */
    1320           49 :     if (track_cost_delay_timing)
    1321            0 :         pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
    1322              :                                             parallel_vacuum_worker_delay_ns);
    1323              : 
    1324           49 :     TidStoreDetach(dead_items);
    1325              : 
    1326              :     /* Pop the error context stack */
    1327           49 :     error_context_stack = errcallback.previous;
    1328              : 
    1329           49 :     vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1330           49 :     table_close(rel, ShareUpdateExclusiveLock);
    1331           49 :     FreeAccessStrategy(pvs.bstrategy);
    1332              : 
    1333           49 :     if (shared->is_autovacuum)
    1334            4 :         pv_shared_cost_params = NULL;
    1335           49 : }
    1336              : 
    1337              : /*
    1338              :  * Error context callback for errors occurring during parallel index vacuum.
    1339              :  * The error context messages should match the messages set in the lazy vacuum
    1340              :  * error context.  If you change this function, change vacuum_error_callback()
    1341              :  * as well.
    1342              :  */
    1343              : static void
    1344            3 : parallel_vacuum_error_callback(void *arg)
    1345              : {
    1346            3 :     ParallelVacuumState *errinfo = arg;
    1347              : 
    1348            3 :     switch (errinfo->status)
    1349              :     {
    1350            3 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1351            3 :             errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1352              :                        errinfo->indname,
    1353              :                        errinfo->relnamespace,
    1354              :                        errinfo->relname);
    1355            3 :             break;
    1356            0 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1357            0 :             errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1358              :                        errinfo->indname,
    1359              :                        errinfo->relnamespace,
    1360              :                        errinfo->relname);
    1361            0 :             break;
    1362            0 :         case PARALLEL_INDVAC_STATUS_INITIAL:
    1363              :         case PARALLEL_INDVAC_STATUS_COMPLETED:
    1364              :         default:
    1365            0 :             return;
    1366              :     }
    1367              : }
        

Generated by: LCOV version 2.0-1