Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * vacuumparallel.c
4 : * Support routines for parallel vacuum and autovacuum execution. In the
5 : * comments below, the word "vacuum" will refer to both vacuum and
6 : * autovacuum.
7 : *
8 : * This file contains routines that are intended to support setting up, using,
9 : * and tearing down a ParallelVacuumState.
10 : *
11 : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
12 : * with parallel worker processes. Individual indexes are processed by one
13 : * vacuum process. ParallelVacuumState contains shared information as well as
14 : * the memory space for storing dead items allocated in the DSA area. We
15 : * launch parallel worker processes at the start of parallel index
16 : * bulk-deletion and index cleanup and once all indexes are processed, the
17 : * parallel worker processes exit. Each time we process indexes in parallel,
18 : * the parallel context is re-initialized so that the same DSM can be used for
19 : * multiple passes of index bulk-deletion and index cleanup.
20 : *
21 : * For parallel autovacuum, we need to propagate cost-based vacuum delay
22 : * parameters from the leader to its workers, as the leader's parameters can
23 : * change even while processing a table (e.g., due to a config reload).
24 : * The PVSharedCostParams struct manages these parameters using a
25 : * generation counter. Each parallel worker polls this shared state and
26 : * refreshes its local delay parameters whenever a change is detected.
27 : *
28 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
29 : * Portions Copyright (c) 1994, Regents of the University of California
30 : *
31 : * IDENTIFICATION
32 : * src/backend/commands/vacuumparallel.c
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 : #include "postgres.h"
37 :
38 : #include "access/amapi.h"
39 : #include "access/table.h"
40 : #include "access/xact.h"
41 : #include "commands/progress.h"
42 : #include "commands/vacuum.h"
43 : #include "executor/instrument.h"
44 : #include "optimizer/paths.h"
45 : #include "pgstat.h"
46 : #include "storage/bufmgr.h"
47 : #include "storage/proc.h"
48 : #include "tcop/tcopprot.h"
49 : #include "utils/lsyscache.h"
50 : #include "utils/rel.h"
51 :
52 : /*
53 : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
54 : * we don't need to worry about DSM keys conflicting with plan_node_id we can
55 : * use small integers.
56 : */
57 : #define PARALLEL_VACUUM_KEY_SHARED 1
58 : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
59 : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
60 : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
61 : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
62 :
63 : /*
64 : * Struct for cost-based vacuum delay related parameters to share among an
65 : * autovacuum worker and its parallel vacuum workers.
66 : */
67 : typedef struct PVSharedCostParams
68 : {
69 : /*
70 : * The generation counter is incremented by the leader process each time
71 : * it updates the shared cost-based vacuum delay parameters. Parallel
72 : * vacuum workers compare it with their local generation,
73 : * shared_params_generation_local, to detect whether they need to refresh
74 : * their local parameters. The generation starts from 1 so that a freshly
75 : * started worker (whose local copy is 0) will always load the initial
76 : * parameters on its first check.
77 : */
78 : pg_atomic_uint32 generation;
79 :
80 : slock_t mutex; /* protects all fields below */
81 :
82 : /* Parameters to share with parallel workers */
83 : double cost_delay;
84 : int cost_limit;
85 : int cost_page_dirty;
86 : int cost_page_hit;
87 : int cost_page_miss;
88 : } PVSharedCostParams;
89 :
90 : /*
91 : * Shared information among parallel workers. So this is allocated in the DSM
92 : * segment.
93 : */
94 : typedef struct PVShared
95 : {
96 : /*
97 : * Target table relid, log level (for messages about parallel workers
98 : * launched during VACUUM VERBOSE) and query ID. These fields are not
99 : * modified during the parallel vacuum.
100 : */
101 : Oid relid;
102 : int elevel;
103 : int64 queryid;
104 :
105 : /*
106 : * Fields for both index vacuum and cleanup.
107 : *
108 : * reltuples is the total number of input heap tuples. We set either old
109 : * live tuples in the index vacuum case or the new live tuples in the
110 : * index cleanup case.
111 : *
112 : * estimated_count is true if reltuples is an estimated value. (Note that
113 : * reltuples could be -1 in this case, indicating we have no idea.)
114 : */
115 : double reltuples;
116 : bool estimated_count;
117 :
118 : /*
119 : * In single process vacuum we could consume more memory during index
120 : * vacuuming or cleanup apart from the memory for heap scanning. In
121 : * parallel vacuum, since individual vacuum workers can consume memory
122 : * equal to maintenance_work_mem, the new maintenance_work_mem for each
123 : * worker is set such that the parallel operation doesn't consume more
124 : * memory than single process vacuum.
125 : */
126 : int maintenance_work_mem_worker;
127 :
128 : /*
129 : * The number of buffers each worker's Buffer Access Strategy ring should
130 : * contain.
131 : */
132 : int ring_nbuffers;
133 :
134 : /*
135 : * Shared vacuum cost balance. During parallel vacuum,
136 : * VacuumSharedCostBalance points to this value and it accumulates the
137 : * balance of each parallel vacuum worker.
138 : */
139 : pg_atomic_uint32 cost_balance;
140 :
141 : /*
142 : * Number of active parallel workers. This is used for computing the
143 : * minimum threshold of the vacuum cost balance before a worker sleeps for
144 : * cost-based delay.
145 : */
146 : pg_atomic_uint32 active_nworkers;
147 :
148 : /* Counter for vacuuming and cleanup */
149 : pg_atomic_uint32 idx;
150 :
151 : /* DSA handle where the TidStore lives */
152 : dsa_handle dead_items_dsa_handle;
153 :
154 : /* DSA pointer to the shared TidStore */
155 : dsa_pointer dead_items_handle;
156 :
157 : /* Statistics of shared dead items */
158 : VacDeadItemsInfo dead_items_info;
159 :
160 : /*
161 : * If 'true' then we are running parallel autovacuum. Otherwise, we are
162 : * running parallel maintenance VACUUM.
163 : */
164 : bool is_autovacuum;
165 :
166 : /*
167 : * Cost-based vacuum delay parameters shared between the autovacuum leader
168 : * and its parallel workers.
169 : */
170 : PVSharedCostParams cost_params;
171 : } PVShared;
172 :
173 : /* Status used during parallel index vacuum or cleanup */
174 : typedef enum PVIndVacStatus
175 : {
176 : PARALLEL_INDVAC_STATUS_INITIAL = 0,
177 : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
178 : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
179 : PARALLEL_INDVAC_STATUS_COMPLETED,
180 : } PVIndVacStatus;
181 :
182 : /*
183 : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
184 : * This includes the status of parallel index vacuum as well as index statistics.
185 : */
186 : typedef struct PVIndStats
187 : {
188 : /*
189 : * The following two fields are set by leader process before executing
190 : * parallel index vacuum or parallel index cleanup. These fields are not
191 : * fixed for the entire VACUUM operation. They are only fixed for an
192 : * individual parallel index vacuum and cleanup.
193 : *
194 : * parallel_workers_can_process is true if both leader and worker can
195 : * process the index, otherwise only leader can process it.
196 : */
197 : PVIndVacStatus status;
198 : bool parallel_workers_can_process;
199 :
200 : /*
201 : * Individual worker or leader stores the result of index vacuum or
202 : * cleanup.
203 : */
204 : bool istat_updated; /* are the stats updated? */
205 : IndexBulkDeleteResult istat;
206 : } PVIndStats;
207 :
208 : /*
209 : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
210 : */
211 : struct ParallelVacuumState
212 : {
213 : /* NULL for worker processes */
214 : ParallelContext *pcxt;
215 :
216 : /* Parent Heap Relation */
217 : Relation heaprel;
218 :
219 : /* Target indexes */
220 : Relation *indrels;
221 : int nindexes;
222 :
223 : /* Shared information among parallel vacuum workers */
224 : PVShared *shared;
225 :
226 : /*
227 : * Shared index statistics among parallel vacuum workers. The array
228 : * element is allocated for every index, even those indexes where parallel
229 : * index vacuuming is unsafe or not worthwhile (e.g.,
230 : * will_parallel_vacuum[] is false). During parallel vacuum,
231 : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
232 : * local memory at the end of parallel vacuum.
233 : */
234 : PVIndStats *indstats;
235 :
236 : /* Shared dead items space among parallel vacuum workers */
237 : TidStore *dead_items;
238 :
239 : /* Points to buffer usage area in DSM */
240 : BufferUsage *buffer_usage;
241 :
242 : /* Points to WAL usage area in DSM */
243 : WalUsage *wal_usage;
244 :
245 : /*
246 : * False if the index is totally unsuitable target for all parallel
247 : * processing. For example, the index could be <
248 : * min_parallel_index_scan_size cutoff.
249 : */
250 : bool *will_parallel_vacuum;
251 :
252 : /*
253 : * The number of indexes that support parallel index bulk-deletion and
254 : * parallel index cleanup respectively.
255 : */
256 : int nindexes_parallel_bulkdel;
257 : int nindexes_parallel_cleanup;
258 : int nindexes_parallel_condcleanup;
259 :
260 : /* Buffer access strategy used by leader process */
261 : BufferAccessStrategy bstrategy;
262 :
263 : /*
264 : * Error reporting state. The error callback is set only for workers
265 : * processes during parallel index vacuum.
266 : */
267 : char *relnamespace;
268 : char *relname;
269 : char *indname;
270 : PVIndVacStatus status;
271 : };
272 :
273 : static PVSharedCostParams *pv_shared_cost_params = NULL;
274 :
275 : /*
276 : * Worker-local copy of the last cost-parameter generation this worker has
277 : * applied. Initialized to 0; since the leader initializes the shared
278 : * generation counter to 1, the first call to
279 : * parallel_vacuum_update_shared_delay_params() will always detect a
280 : * mismatch and read the initial parameters from shared memory.
281 : */
282 : static uint32 shared_params_generation_local = 0;
283 :
284 : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
285 : bool *will_parallel_vacuum);
286 : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
287 : bool vacuum, PVWorkerStats *wstats);
288 : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
289 : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
290 : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
291 : PVIndStats *indstats);
292 : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
293 : bool vacuum);
294 : static void parallel_vacuum_error_callback(void *arg);
295 : static inline void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params);
296 : static void parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg);
297 :
298 : /*
299 : * Try to enter parallel mode and create a parallel context. Then initialize
300 : * shared memory state.
301 : *
302 : * On success, return parallel vacuum state. Otherwise return NULL.
303 : */
304 : ParallelVacuumState *
305 44754 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
306 : int nrequested_workers, int vac_work_mem,
307 : int elevel, BufferAccessStrategy bstrategy)
308 : {
309 : ParallelVacuumState *pvs;
310 : ParallelContext *pcxt;
311 : PVShared *shared;
312 : TidStore *dead_items;
313 : PVIndStats *indstats;
314 : BufferUsage *buffer_usage;
315 : WalUsage *wal_usage;
316 : bool *will_parallel_vacuum;
317 : Size est_indstats_len;
318 : Size est_shared_len;
319 44754 : int nindexes_mwm = 0;
320 44754 : int parallel_workers = 0;
321 : int querylen;
322 :
323 : /*
324 : * A parallel vacuum must be requested and there must be indexes on the
325 : * relation
326 : */
327 : Assert(nrequested_workers >= 0);
328 : Assert(nindexes > 0);
329 :
330 : /*
331 : * Compute the number of parallel vacuum workers to launch
332 : */
333 44754 : will_parallel_vacuum = palloc0_array(bool, nindexes);
334 44754 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
335 : nrequested_workers,
336 : will_parallel_vacuum);
337 44754 : if (parallel_workers <= 0)
338 : {
339 : /* Can't perform vacuum in parallel -- return NULL */
340 44728 : pfree(will_parallel_vacuum);
341 44728 : return NULL;
342 : }
343 :
344 26 : pvs = palloc0_object(ParallelVacuumState);
345 26 : pvs->indrels = indrels;
346 26 : pvs->nindexes = nindexes;
347 26 : pvs->will_parallel_vacuum = will_parallel_vacuum;
348 26 : pvs->bstrategy = bstrategy;
349 26 : pvs->heaprel = rel;
350 :
351 26 : EnterParallelMode();
352 26 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
353 : parallel_workers);
354 : Assert(pcxt->nworkers > 0);
355 26 : pvs->pcxt = pcxt;
356 :
357 : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
358 26 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
359 26 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
360 26 : shm_toc_estimate_keys(&pcxt->estimator, 1);
361 :
362 : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
363 26 : est_shared_len = sizeof(PVShared);
364 26 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
365 26 : shm_toc_estimate_keys(&pcxt->estimator, 1);
366 :
367 : /*
368 : * Estimate space for BufferUsage and WalUsage --
369 : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
370 : *
371 : * If there are no extensions loaded that care, we could skip this. We
372 : * have no way of knowing whether anyone's looking at pgBufferUsage or
373 : * pgWalUsage, so do it unconditionally.
374 : */
375 26 : shm_toc_estimate_chunk(&pcxt->estimator,
376 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
377 26 : shm_toc_estimate_keys(&pcxt->estimator, 1);
378 26 : shm_toc_estimate_chunk(&pcxt->estimator,
379 : mul_size(sizeof(WalUsage), pcxt->nworkers));
380 26 : shm_toc_estimate_keys(&pcxt->estimator, 1);
381 :
382 : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
383 26 : if (debug_query_string)
384 : {
385 23 : querylen = strlen(debug_query_string);
386 23 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
387 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
388 : }
389 : else
390 3 : querylen = 0; /* keep compiler quiet */
391 :
392 26 : InitializeParallelDSM(pcxt);
393 :
394 : /* Prepare index vacuum stats */
395 26 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
396 566 : MemSet(indstats, 0, est_indstats_len);
397 116 : for (int i = 0; i < nindexes; i++)
398 : {
399 90 : Relation indrel = indrels[i];
400 90 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
401 :
402 : /*
403 : * Cleanup option should be either disabled, always performing in
404 : * parallel or conditionally performing in parallel.
405 : */
406 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
407 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
408 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
409 :
410 90 : if (!will_parallel_vacuum[i])
411 4 : continue;
412 :
413 86 : if (indrel->rd_indam->amusemaintenanceworkmem)
414 8 : nindexes_mwm++;
415 :
416 : /*
417 : * Remember the number of indexes that support parallel operation for
418 : * each phase.
419 : */
420 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
421 78 : pvs->nindexes_parallel_bulkdel++;
422 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
423 16 : pvs->nindexes_parallel_cleanup++;
424 86 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
425 62 : pvs->nindexes_parallel_condcleanup++;
426 : }
427 26 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
428 26 : pvs->indstats = indstats;
429 :
430 : /* Prepare shared information */
431 26 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
432 416 : MemSet(shared, 0, est_shared_len);
433 26 : shared->relid = RelationGetRelid(rel);
434 26 : shared->elevel = elevel;
435 26 : shared->queryid = pgstat_get_my_query_id();
436 26 : shared->maintenance_work_mem_worker =
437 : (nindexes_mwm > 0) ?
438 26 : vac_work_mem / Min(parallel_workers, nindexes_mwm) :
439 : vac_work_mem;
440 :
441 26 : shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
442 :
443 : /* Prepare DSA space for dead items */
444 26 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
445 : LWTRANCHE_PARALLEL_VACUUM_DSA);
446 26 : pvs->dead_items = dead_items;
447 26 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
448 26 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
449 :
450 : /* Use the same buffer size for all workers */
451 26 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
452 :
453 26 : pg_atomic_init_u32(&(shared->cost_balance), 0);
454 26 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
455 26 : pg_atomic_init_u32(&(shared->idx), 0);
456 :
457 26 : shared->is_autovacuum = AmAutoVacuumWorkerProcess();
458 :
459 : /*
460 : * Initialize shared cost-based vacuum delay parameters if it's for
461 : * autovacuum.
462 : */
463 26 : if (shared->is_autovacuum)
464 : {
465 3 : parallel_vacuum_set_cost_parameters(&shared->cost_params);
466 3 : pg_atomic_init_u32(&shared->cost_params.generation, 1);
467 3 : SpinLockInit(&shared->cost_params.mutex);
468 :
469 3 : pv_shared_cost_params = &(shared->cost_params);
470 3 : on_dsm_detach(pcxt->seg, parallel_vacuum_dsm_detach, (Datum) 0);
471 : }
472 :
473 26 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
474 26 : pvs->shared = shared;
475 :
476 : /*
477 : * Allocate space for each worker's BufferUsage and WalUsage; no need to
478 : * initialize
479 : */
480 26 : buffer_usage = shm_toc_allocate(pcxt->toc,
481 26 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
482 26 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
483 26 : pvs->buffer_usage = buffer_usage;
484 26 : wal_usage = shm_toc_allocate(pcxt->toc,
485 26 : mul_size(sizeof(WalUsage), pcxt->nworkers));
486 26 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
487 26 : pvs->wal_usage = wal_usage;
488 :
489 : /* Store query string for workers */
490 26 : if (debug_query_string)
491 : {
492 : char *sharedquery;
493 :
494 23 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
495 23 : memcpy(sharedquery, debug_query_string, querylen + 1);
496 23 : sharedquery[querylen] = '\0';
497 23 : shm_toc_insert(pcxt->toc,
498 : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
499 : }
500 :
501 : /* Success -- return parallel vacuum state */
502 26 : return pvs;
503 : }
504 :
505 : /*
506 : * Destroy the parallel context, and end parallel mode.
507 : *
508 : * Since writes are not allowed during parallel mode, copy the
509 : * updated index statistics from DSM into local memory and then later use that
510 : * to update the index statistics. One might think that we can exit from
511 : * parallel mode, update the index statistics and then destroy parallel
512 : * context, but that won't be safe (see ExitParallelMode).
513 : */
514 : void
515 26 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
516 : {
517 : Assert(!IsParallelWorker());
518 :
519 : /* Copy the updated statistics */
520 116 : for (int i = 0; i < pvs->nindexes; i++)
521 : {
522 90 : PVIndStats *indstats = &(pvs->indstats[i]);
523 :
524 90 : if (indstats->istat_updated)
525 : {
526 62 : istats[i] = palloc0_object(IndexBulkDeleteResult);
527 62 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
528 : }
529 : else
530 28 : istats[i] = NULL;
531 : }
532 :
533 26 : TidStoreDestroy(pvs->dead_items);
534 :
535 26 : DestroyParallelContext(pvs->pcxt);
536 26 : ExitParallelMode();
537 :
538 26 : if (AmAutoVacuumWorkerProcess())
539 3 : pv_shared_cost_params = NULL;
540 :
541 26 : pfree(pvs->will_parallel_vacuum);
542 26 : pfree(pvs);
543 26 : }
544 :
545 : /*
546 : * DSM detach callback. This is invoked when an autovacuum worker detaches
547 : * from the DSM segment holding PVShared. It ensures to reset the local pointer
548 : * to the shared state even if parallel vacuum raises an error and doesn't
549 : * call parallel_vacuum_end().
550 : */
551 : static void
552 3 : parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg)
553 : {
554 : Assert(AmAutoVacuumWorkerProcess());
555 3 : pv_shared_cost_params = NULL;
556 3 : }
557 :
558 : /*
559 : * Returns the dead items space and dead items information.
560 : */
561 : TidStore *
562 50 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
563 : {
564 50 : *dead_items_info_p = &(pvs->shared->dead_items_info);
565 50 : return pvs->dead_items;
566 : }
567 :
568 : /* Forget all items in dead_items */
569 : void
570 24 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
571 : {
572 24 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
573 :
574 : /*
575 : * Free the current tidstore and return allocated DSA segments to the
576 : * operating system. Then we recreate the tidstore with the same max_bytes
577 : * limitation we just used.
578 : */
579 24 : TidStoreDestroy(pvs->dead_items);
580 24 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
581 : LWTRANCHE_PARALLEL_VACUUM_DSA);
582 :
583 : /* Update the DSA pointer for dead_items to the new one */
584 24 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
585 24 : pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
586 :
587 : /* Reset the counter */
588 24 : dead_items_info->num_items = 0;
589 24 : }
590 :
591 : /*
592 : * Do parallel index bulk-deletion with parallel workers.
593 : */
594 : void
595 24 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
596 : int num_index_scans, PVWorkerStats *wstats)
597 : {
598 : Assert(!IsParallelWorker());
599 :
600 : /*
601 : * We can only provide an approximate value of num_heap_tuples, at least
602 : * for now.
603 : */
604 24 : pvs->shared->reltuples = num_table_tuples;
605 24 : pvs->shared->estimated_count = true;
606 :
607 24 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
608 24 : }
609 :
610 : /*
611 : * Do parallel index cleanup with parallel workers.
612 : */
613 : void
614 26 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
615 : int num_index_scans, bool estimated_count,
616 : PVWorkerStats *wstats)
617 : {
618 : Assert(!IsParallelWorker());
619 :
620 : /*
621 : * We can provide a better estimate of total number of surviving tuples
622 : * (we assume indexes are more interested in that than in the number of
623 : * nominally live tuples).
624 : */
625 26 : pvs->shared->reltuples = num_table_tuples;
626 26 : pvs->shared->estimated_count = estimated_count;
627 :
628 26 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
629 26 : }
630 :
631 : /*
632 : * Fill in the given structure with cost-based vacuum delay parameter values.
633 : */
634 : static inline void
635 4 : parallel_vacuum_set_cost_parameters(PVSharedCostParams *params)
636 : {
637 4 : params->cost_delay = vacuum_cost_delay;
638 4 : params->cost_limit = vacuum_cost_limit;
639 4 : params->cost_page_dirty = VacuumCostPageDirty;
640 4 : params->cost_page_hit = VacuumCostPageHit;
641 4 : params->cost_page_miss = VacuumCostPageMiss;
642 4 : }
643 :
644 : /*
645 : * Updates the cost-based vacuum delay parameters for parallel autovacuum
646 : * workers.
647 : *
648 : * For non-autovacuum parallel workers, this function will have no effect.
649 : */
650 : void
651 205 : parallel_vacuum_update_shared_delay_params(void)
652 : {
653 : uint32 params_generation;
654 :
655 : Assert(IsParallelWorker());
656 :
657 : /* Quick return if the worker is not running for the autovacuum */
658 205 : if (pv_shared_cost_params == NULL)
659 48 : return;
660 :
661 157 : params_generation = pg_atomic_read_u32(&pv_shared_cost_params->generation);
662 : Assert(shared_params_generation_local <= params_generation);
663 :
664 : /* Return if parameters had not changed in the leader */
665 157 : if (params_generation == shared_params_generation_local)
666 153 : return;
667 :
668 4 : SpinLockAcquire(&pv_shared_cost_params->mutex);
669 4 : VacuumCostDelay = pv_shared_cost_params->cost_delay;
670 4 : VacuumCostLimit = pv_shared_cost_params->cost_limit;
671 4 : VacuumCostPageDirty = pv_shared_cost_params->cost_page_dirty;
672 4 : VacuumCostPageHit = pv_shared_cost_params->cost_page_hit;
673 4 : VacuumCostPageMiss = pv_shared_cost_params->cost_page_miss;
674 4 : SpinLockRelease(&pv_shared_cost_params->mutex);
675 :
676 4 : VacuumUpdateCosts();
677 :
678 4 : shared_params_generation_local = params_generation;
679 :
680 4 : elog(DEBUG2,
681 : "parallel autovacuum worker updated cost params: cost_limit=%d, cost_delay=%g, cost_page_miss=%d, cost_page_dirty=%d, cost_page_hit=%d",
682 : vacuum_cost_limit,
683 : vacuum_cost_delay,
684 : VacuumCostPageMiss,
685 : VacuumCostPageDirty,
686 : VacuumCostPageHit);
687 : }
688 :
689 : /*
690 : * Store the cost-based vacuum delay parameters in the shared memory so that
691 : * parallel vacuum workers can consume them (see
692 : * parallel_vacuum_update_shared_delay_params()).
693 : */
694 : void
695 1 : parallel_vacuum_propagate_shared_delay_params(void)
696 : {
697 : Assert(AmAutoVacuumWorkerProcess());
698 :
699 : /*
700 : * Quick return if the leader process is not sharing the delay parameters.
701 : */
702 1 : if (pv_shared_cost_params == NULL)
703 0 : return;
704 :
705 : /*
706 : * Check if any delay parameters have changed. We can read them without
707 : * locks as only the leader can modify them.
708 : */
709 1 : if (vacuum_cost_delay == pv_shared_cost_params->cost_delay &&
710 0 : vacuum_cost_limit == pv_shared_cost_params->cost_limit &&
711 0 : VacuumCostPageDirty == pv_shared_cost_params->cost_page_dirty &&
712 0 : VacuumCostPageHit == pv_shared_cost_params->cost_page_hit &&
713 0 : VacuumCostPageMiss == pv_shared_cost_params->cost_page_miss)
714 0 : return;
715 :
716 : /* Update the shared delay parameters */
717 1 : SpinLockAcquire(&pv_shared_cost_params->mutex);
718 1 : parallel_vacuum_set_cost_parameters(pv_shared_cost_params);
719 1 : SpinLockRelease(&pv_shared_cost_params->mutex);
720 :
721 : /*
722 : * Increment the generation of the parameters, i.e. let parallel workers
723 : * know that they should re-read shared cost params.
724 : */
725 1 : pg_atomic_fetch_add_u32(&pv_shared_cost_params->generation, 1);
726 : }
727 :
728 : /*
729 : * Compute the number of parallel worker processes to request. Both index
730 : * vacuum and index cleanup can be executed with parallel workers.
731 : * The index is eligible for parallel vacuum iff its size is greater than
732 : * min_parallel_index_scan_size as invoking workers for very small indexes
733 : * can hurt performance.
734 : *
735 : * nrequested is the number of parallel workers that user requested. If
736 : * nrequested is 0, we compute the parallel degree based on nindexes, that is
737 : * the number of indexes that support parallel vacuum. This function also
738 : * sets will_parallel_vacuum to remember indexes that participate in parallel
739 : * vacuum.
740 : */
741 : static int
742 44754 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
743 : bool *will_parallel_vacuum)
744 : {
745 44754 : int nindexes_parallel = 0;
746 44754 : int nindexes_parallel_bulkdel = 0;
747 44754 : int nindexes_parallel_cleanup = 0;
748 : int parallel_workers;
749 : int max_workers;
750 :
751 89508 : max_workers = AmAutoVacuumWorkerProcess() ?
752 44754 : autovacuum_max_parallel_workers :
753 : max_parallel_maintenance_workers;
754 :
755 : /*
756 : * We don't allow performing parallel operation in standalone backend or
757 : * when parallelism is disabled.
758 : */
759 44754 : if (!IsUnderPostmaster || max_workers == 0)
760 40959 : return 0;
761 :
762 : /*
763 : * Compute the number of indexes that can participate in parallel vacuum.
764 : */
765 12361 : for (int i = 0; i < nindexes; i++)
766 : {
767 8566 : Relation indrel = indrels[i];
768 8566 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
769 :
770 : /* Skip index that is not a suitable target for parallel index vacuum */
771 8566 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
772 8566 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
773 8468 : continue;
774 :
775 98 : will_parallel_vacuum[i] = true;
776 :
777 98 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
778 90 : nindexes_parallel_bulkdel++;
779 98 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
780 82 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
781 90 : nindexes_parallel_cleanup++;
782 : }
783 :
784 3795 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
785 : nindexes_parallel_cleanup);
786 :
787 : /* The leader process takes one index */
788 3795 : nindexes_parallel--;
789 :
790 : /* No index supports parallel vacuum */
791 3795 : if (nindexes_parallel <= 0)
792 3769 : return 0;
793 :
794 : /* Compute the parallel degree */
795 26 : parallel_workers = (nrequested > 0) ?
796 26 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
797 :
798 : /* Cap by GUC variable */
799 26 : parallel_workers = Min(parallel_workers, max_workers);
800 :
801 26 : return parallel_workers;
802 : }
803 :
804 : /*
805 : * Perform index vacuum or index cleanup with parallel workers. This function
806 : * must be used by the parallel vacuum leader process.
807 : *
808 : * If wstats is not NULL, the parallel worker statistics are updated.
809 : */
810 : static void
811 50 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
812 : bool vacuum, PVWorkerStats *wstats)
813 : {
814 : int nworkers;
815 : PVIndVacStatus new_status;
816 :
817 : Assert(!IsParallelWorker());
818 :
819 50 : if (vacuum)
820 : {
821 24 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
822 :
823 : /* Determine the number of parallel workers to launch */
824 24 : nworkers = pvs->nindexes_parallel_bulkdel;
825 : }
826 : else
827 : {
828 26 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
829 :
830 : /* Determine the number of parallel workers to launch */
831 26 : nworkers = pvs->nindexes_parallel_cleanup;
832 :
833 : /* Add conditionally parallel-aware indexes if in the first time call */
834 26 : if (num_index_scans == 0)
835 14 : nworkers += pvs->nindexes_parallel_condcleanup;
836 : }
837 :
838 : /* The leader process will participate */
839 50 : nworkers--;
840 :
841 : /*
842 : * It is possible that parallel context is initialized with fewer workers
843 : * than the number of indexes that need a separate worker in the current
844 : * phase, so we need to consider it. See
845 : * parallel_vacuum_compute_workers().
846 : */
847 50 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
848 :
849 : /* Update the statistics, if we asked to */
850 50 : if (wstats != NULL && nworkers > 0)
851 40 : wstats->nplanned += nworkers;
852 :
853 : /*
854 : * Set index vacuum status and mark whether parallel vacuum worker can
855 : * process it.
856 : */
857 202 : for (int i = 0; i < pvs->nindexes; i++)
858 : {
859 152 : PVIndStats *indstats = &(pvs->indstats[i]);
860 :
861 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
862 152 : indstats->status = new_status;
863 152 : indstats->parallel_workers_can_process =
864 296 : (pvs->will_parallel_vacuum[i] &&
865 144 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
866 : num_index_scans,
867 152 : vacuum));
868 : }
869 :
870 : /* Reset the parallel index processing and progress counters */
871 50 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
872 :
873 : /* Setup the shared cost-based vacuum delay and launch workers */
874 50 : if (nworkers > 0)
875 : {
876 : /* Reinitialize parallel context to relaunch parallel workers */
877 40 : if (num_index_scans > 0)
878 14 : ReinitializeParallelDSM(pvs->pcxt);
879 :
880 : /*
881 : * Set up shared cost balance and the number of active workers for
882 : * vacuum delay. We need to do this before launching workers as
883 : * otherwise, they might not see the updated values for these
884 : * parameters.
885 : */
886 40 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
887 40 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
888 :
889 : /*
890 : * The number of workers can vary between bulkdelete and cleanup
891 : * phase.
892 : */
893 40 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
894 :
895 40 : LaunchParallelWorkers(pvs->pcxt);
896 :
897 40 : if (pvs->pcxt->nworkers_launched > 0)
898 : {
899 : /*
900 : * Reset the local cost values for leader backend as we have
901 : * already accumulated the remaining balance of heap.
902 : */
903 40 : VacuumCostBalance = 0;
904 40 : VacuumCostBalanceLocal = 0;
905 :
906 : /* Enable shared cost balance for leader backend */
907 40 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
908 40 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
909 :
910 : /* Update the statistics, if we asked to */
911 40 : if (wstats != NULL)
912 40 : wstats->nlaunched += pvs->pcxt->nworkers_launched;
913 : }
914 :
915 40 : if (vacuum)
916 24 : ereport(pvs->shared->elevel,
917 : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
918 : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
919 : pvs->pcxt->nworkers_launched),
920 : pvs->pcxt->nworkers_launched, nworkers)));
921 : else
922 16 : ereport(pvs->shared->elevel,
923 : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
924 : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
925 : pvs->pcxt->nworkers_launched),
926 : pvs->pcxt->nworkers_launched, nworkers)));
927 : }
928 :
929 : /* Vacuum the indexes that can be processed by only leader process */
930 50 : parallel_vacuum_process_unsafe_indexes(pvs);
931 :
932 : /*
933 : * Join as a parallel worker. The leader vacuums alone processes all
934 : * parallel-safe indexes in the case where no workers are launched.
935 : */
936 50 : parallel_vacuum_process_safe_indexes(pvs);
937 :
938 : /*
939 : * Next, accumulate buffer and WAL usage. (This must wait for the workers
940 : * to finish, or we might get incomplete data.)
941 : */
942 50 : if (nworkers > 0)
943 : {
944 : /* Wait for all vacuum workers to finish */
945 40 : WaitForParallelWorkersToFinish(pvs->pcxt);
946 :
947 89 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
948 49 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
949 : }
950 :
951 : /*
952 : * Reset all index status back to initial (while checking that we have
953 : * vacuumed all indexes).
954 : */
955 202 : for (int i = 0; i < pvs->nindexes; i++)
956 : {
957 152 : PVIndStats *indstats = &(pvs->indstats[i]);
958 :
959 152 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
960 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
961 : RelationGetRelationName(pvs->indrels[i]));
962 :
963 152 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
964 : }
965 :
966 : /*
967 : * Carry the shared balance value to heap scan and disable shared costing
968 : */
969 50 : if (VacuumSharedCostBalance)
970 : {
971 40 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
972 40 : VacuumSharedCostBalance = NULL;
973 40 : VacuumActiveNWorkers = NULL;
974 : }
975 50 : }
976 :
977 : /*
978 : * Index vacuum/cleanup routine used by the leader process and parallel
979 : * vacuum worker processes to vacuum the indexes in parallel.
980 : */
981 : static void
982 99 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
983 : {
984 : /*
985 : * Increment the active worker count if we are able to launch any worker.
986 : */
987 99 : if (VacuumActiveNWorkers)
988 89 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
989 :
990 : /* Loop until all indexes are vacuumed */
991 : for (;;)
992 152 : {
993 : int idx;
994 : PVIndStats *indstats;
995 :
996 : /* Get an index number to process */
997 251 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
998 :
999 : /* Done for all indexes? */
1000 251 : if (idx >= pvs->nindexes)
1001 99 : break;
1002 :
1003 152 : indstats = &(pvs->indstats[idx]);
1004 :
1005 : /*
1006 : * Skip vacuuming index that is unsafe for workers or has an
1007 : * unsuitable target for parallel index vacuum (this is vacuumed in
1008 : * parallel_vacuum_process_unsafe_indexes() by the leader).
1009 : */
1010 152 : if (!indstats->parallel_workers_can_process)
1011 46 : continue;
1012 :
1013 : /* Do vacuum or cleanup of the index */
1014 106 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
1015 : }
1016 :
1017 : /*
1018 : * We have completed the index vacuum so decrement the active worker
1019 : * count.
1020 : */
1021 99 : if (VacuumActiveNWorkers)
1022 89 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
1023 99 : }
1024 :
1025 : /*
1026 : * Perform parallel vacuuming of indexes in leader process.
1027 : *
1028 : * Handles index vacuuming (or index cleanup) for indexes that are not
1029 : * parallel safe. It's possible that this will vary for a given index, based
1030 : * on details like whether we're performing index cleanup right now.
1031 : *
1032 : * Also performs vacuuming of smaller indexes that fell under the size cutoff
1033 : * enforced by parallel_vacuum_compute_workers().
1034 : */
1035 : static void
1036 50 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
1037 : {
1038 : Assert(!IsParallelWorker());
1039 :
1040 : /*
1041 : * Increment the active worker count if we are able to launch any worker.
1042 : */
1043 50 : if (VacuumActiveNWorkers)
1044 40 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
1045 :
1046 202 : for (int i = 0; i < pvs->nindexes; i++)
1047 : {
1048 152 : PVIndStats *indstats = &(pvs->indstats[i]);
1049 :
1050 : /* Skip, indexes that are safe for workers */
1051 152 : if (indstats->parallel_workers_can_process)
1052 106 : continue;
1053 :
1054 : /* Do vacuum or cleanup of the index */
1055 46 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
1056 : }
1057 :
1058 : /*
1059 : * We have completed the index vacuum so decrement the active worker
1060 : * count.
1061 : */
1062 50 : if (VacuumActiveNWorkers)
1063 40 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
1064 50 : }
1065 :
1066 : /*
1067 : * Vacuum or cleanup index either by leader process or by one of the worker
1068 : * process. After vacuuming the index this function copies the index
1069 : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
1070 : * segment.
1071 : */
1072 : static void
1073 152 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
1074 : PVIndStats *indstats)
1075 : {
1076 152 : IndexBulkDeleteResult *istat = NULL;
1077 : IndexBulkDeleteResult *istat_res;
1078 : IndexVacuumInfo ivinfo;
1079 :
1080 : /*
1081 : * Update the pointer to the corresponding bulk-deletion result if someone
1082 : * has already updated it
1083 : */
1084 152 : if (indstats->istat_updated)
1085 62 : istat = &(indstats->istat);
1086 :
1087 152 : ivinfo.index = indrel;
1088 152 : ivinfo.heaprel = pvs->heaprel;
1089 152 : ivinfo.analyze_only = false;
1090 152 : ivinfo.report_progress = false;
1091 152 : ivinfo.message_level = DEBUG2;
1092 152 : ivinfo.estimated_count = pvs->shared->estimated_count;
1093 152 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
1094 152 : ivinfo.strategy = pvs->bstrategy;
1095 :
1096 : /* Update error traceback information */
1097 152 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
1098 152 : pvs->status = indstats->status;
1099 :
1100 152 : switch (indstats->status)
1101 : {
1102 62 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1103 62 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
1104 62 : &pvs->shared->dead_items_info);
1105 62 : break;
1106 90 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1107 90 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
1108 90 : break;
1109 0 : default:
1110 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
1111 : indstats->status,
1112 : RelationGetRelationName(indrel));
1113 : }
1114 :
1115 : /*
1116 : * Copy the index bulk-deletion result returned from ambulkdelete and
1117 : * amvacuumcleanup to the DSM segment if it's the first cycle because they
1118 : * allocate locally and it's possible that an index will be vacuumed by a
1119 : * different vacuum process the next cycle. Copying the result normally
1120 : * happens only the first time an index is vacuumed. For any additional
1121 : * vacuum pass, we directly point to the result on the DSM segment and
1122 : * pass it to vacuum index APIs so that workers can update it directly.
1123 : *
1124 : * Since all vacuum workers write the bulk-deletion result at different
1125 : * slots we can write them without locking.
1126 : */
1127 152 : if (!indstats->istat_updated && istat_res != NULL)
1128 : {
1129 62 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
1130 62 : indstats->istat_updated = true;
1131 :
1132 : /* Free the locally-allocated bulk-deletion result */
1133 62 : pfree(istat_res);
1134 : }
1135 :
1136 : /*
1137 : * Update the status to completed. No need to lock here since each worker
1138 : * touches different indexes.
1139 : */
1140 152 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
1141 :
1142 : /* Reset error traceback information */
1143 152 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
1144 152 : pfree(pvs->indname);
1145 152 : pvs->indname = NULL;
1146 :
1147 : /*
1148 : * Call the parallel variant of pgstat_progress_incr_param so workers can
1149 : * report progress of index vacuum to the leader.
1150 : */
1151 152 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
1152 152 : }
1153 :
1154 : /*
1155 : * Returns false, if the given index can't participate in the next execution of
1156 : * parallel index vacuum or parallel index cleanup.
1157 : */
1158 : static bool
1159 144 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
1160 : bool vacuum)
1161 : {
1162 : uint8 vacoptions;
1163 :
1164 144 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
1165 :
1166 : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
1167 144 : if (vacuum)
1168 58 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
1169 :
1170 : /* Not safe, if the index does not support parallel cleanup */
1171 86 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
1172 70 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
1173 8 : return false;
1174 :
1175 : /*
1176 : * Not safe, if the index supports parallel cleanup conditionally, but we
1177 : * have already processed the index (for bulkdelete). We do this to avoid
1178 : * the need to invoke workers when parallel index cleanup doesn't need to
1179 : * scan the index. See the comments for option
1180 : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
1181 : * parallel cleanup conditionally.
1182 : */
1183 78 : if (num_index_scans > 0 &&
1184 32 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
1185 28 : return false;
1186 :
1187 50 : return true;
1188 : }
1189 :
1190 : /*
1191 : * Perform work within a launched parallel process.
1192 : *
1193 : * Since parallel vacuum workers perform only index vacuum or index cleanup,
1194 : * we don't need to report progress information.
1195 : */
1196 : void
1197 49 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
1198 : {
1199 : ParallelVacuumState pvs;
1200 : Relation rel;
1201 : Relation *indrels;
1202 : PVIndStats *indstats;
1203 : PVShared *shared;
1204 : TidStore *dead_items;
1205 : BufferUsage *buffer_usage;
1206 : WalUsage *wal_usage;
1207 : int nindexes;
1208 : char *sharedquery;
1209 : ErrorContextCallback errcallback;
1210 :
1211 : /*
1212 : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1213 : * don't support parallel vacuum for autovacuum as of now.
1214 : */
1215 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1216 :
1217 49 : elog(DEBUG1, "starting parallel vacuum worker");
1218 :
1219 49 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1220 :
1221 : /* Set debug_query_string for individual workers */
1222 49 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1223 49 : debug_query_string = sharedquery;
1224 49 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1225 :
1226 : /* Track query ID */
1227 49 : pgstat_report_query_id(shared->queryid, false);
1228 :
1229 : /*
1230 : * Open table. The lock mode is the same as the leader process. It's
1231 : * okay because the lock mode does not conflict among the parallel
1232 : * workers.
1233 : */
1234 49 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1235 :
1236 : /*
1237 : * Open all indexes. indrels are sorted in order by OID, which should be
1238 : * matched to the leader's one.
1239 : */
1240 49 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1241 : Assert(nindexes > 0);
1242 :
1243 : /*
1244 : * Apply the desired value of maintenance_work_mem within this process.
1245 : * Really we should use SetConfigOption() to change a GUC, but since we're
1246 : * already in parallel mode guc.c would complain about that. Fortunately,
1247 : * by the same token guc.c will not let any user-defined code change it.
1248 : * So just avert your eyes while we do this:
1249 : */
1250 49 : if (shared->maintenance_work_mem_worker > 0)
1251 49 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1252 :
1253 : /* Set index statistics */
1254 49 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1255 : PARALLEL_VACUUM_KEY_INDEX_STATS,
1256 : false);
1257 :
1258 : /* Find dead_items in shared memory */
1259 49 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1260 : shared->dead_items_handle);
1261 :
1262 : /* Set cost-based vacuum delay */
1263 49 : if (shared->is_autovacuum)
1264 : {
1265 : /*
1266 : * Parallel autovacuum workers initialize cost-based delay parameters
1267 : * from the leader's shared state rather than GUC defaults, because
1268 : * the leader may have applied per-table or autovacuum-specific
1269 : * overrides. pv_shared_cost_params must be set before calling
1270 : * parallel_vacuum_update_shared_delay_params().
1271 : */
1272 4 : pv_shared_cost_params = &(shared->cost_params);
1273 4 : parallel_vacuum_update_shared_delay_params();
1274 : }
1275 : else
1276 45 : VacuumUpdateCosts();
1277 :
1278 49 : VacuumCostBalance = 0;
1279 49 : VacuumCostBalanceLocal = 0;
1280 49 : VacuumSharedCostBalance = &(shared->cost_balance);
1281 49 : VacuumActiveNWorkers = &(shared->active_nworkers);
1282 :
1283 : /* Set parallel vacuum state */
1284 49 : pvs.indrels = indrels;
1285 49 : pvs.nindexes = nindexes;
1286 49 : pvs.indstats = indstats;
1287 49 : pvs.shared = shared;
1288 49 : pvs.dead_items = dead_items;
1289 49 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1290 49 : pvs.relname = pstrdup(RelationGetRelationName(rel));
1291 49 : pvs.heaprel = rel;
1292 :
1293 : /* These fields will be filled during index vacuum or cleanup */
1294 49 : pvs.indname = NULL;
1295 49 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1296 :
1297 : /* Each parallel VACUUM worker gets its own access strategy. */
1298 98 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1299 49 : shared->ring_nbuffers * (BLCKSZ / 1024));
1300 :
1301 : /* Setup error traceback support for ereport() */
1302 49 : errcallback.callback = parallel_vacuum_error_callback;
1303 49 : errcallback.arg = &pvs;
1304 49 : errcallback.previous = error_context_stack;
1305 49 : error_context_stack = &errcallback;
1306 :
1307 : /* Prepare to track buffer usage during parallel execution */
1308 49 : InstrStartParallelQuery();
1309 :
1310 : /* Process indexes to perform vacuum/cleanup */
1311 49 : parallel_vacuum_process_safe_indexes(&pvs);
1312 :
1313 : /* Report buffer/WAL usage during parallel execution */
1314 49 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1315 49 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1316 49 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1317 49 : &wal_usage[ParallelWorkerNumber]);
1318 :
1319 : /* Report any remaining cost-based vacuum delay time */
1320 49 : if (track_cost_delay_timing)
1321 0 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
1322 : parallel_vacuum_worker_delay_ns);
1323 :
1324 49 : TidStoreDetach(dead_items);
1325 :
1326 : /* Pop the error context stack */
1327 49 : error_context_stack = errcallback.previous;
1328 :
1329 49 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1330 49 : table_close(rel, ShareUpdateExclusiveLock);
1331 49 : FreeAccessStrategy(pvs.bstrategy);
1332 :
1333 49 : if (shared->is_autovacuum)
1334 4 : pv_shared_cost_params = NULL;
1335 49 : }
1336 :
1337 : /*
1338 : * Error context callback for errors occurring during parallel index vacuum.
1339 : * The error context messages should match the messages set in the lazy vacuum
1340 : * error context. If you change this function, change vacuum_error_callback()
1341 : * as well.
1342 : */
1343 : static void
1344 3 : parallel_vacuum_error_callback(void *arg)
1345 : {
1346 3 : ParallelVacuumState *errinfo = arg;
1347 :
1348 3 : switch (errinfo->status)
1349 : {
1350 3 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1351 3 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1352 : errinfo->indname,
1353 : errinfo->relnamespace,
1354 : errinfo->relname);
1355 3 : break;
1356 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1357 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1358 : errinfo->indname,
1359 : errinfo->relnamespace,
1360 : errinfo->relname);
1361 0 : break;
1362 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1363 : case PARALLEL_INDVAC_STATUS_COMPLETED:
1364 : default:
1365 0 : return;
1366 : }
1367 : }
|