Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * vacuumparallel.c
4 : * Support routines for parallel vacuum execution.
5 : *
6 : * This file contains routines that are intended to support setting up, using,
7 : * and tearing down a ParallelVacuumState.
8 : *
9 : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : * with parallel worker processes. Individual indexes are processed by one
11 : * vacuum process. ParallelVacuumState contains shared information as well as
12 : * the memory space for storing dead items allocated in the DSA area. We
13 : * launch parallel worker processes at the start of parallel index
14 : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : * parallel worker processes exit. Each time we process indexes in parallel,
16 : * the parallel context is re-initialized so that the same DSM can be used for
17 : * multiple passes of index bulk-deletion and index cleanup.
18 : *
19 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/commands/vacuumparallel.c
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/amapi.h"
30 : #include "access/table.h"
31 : #include "access/xact.h"
32 : #include "commands/progress.h"
33 : #include "commands/vacuum.h"
34 : #include "executor/instrument.h"
35 : #include "optimizer/paths.h"
36 : #include "pgstat.h"
37 : #include "storage/bufmgr.h"
38 : #include "storage/proc.h"
39 : #include "tcop/tcopprot.h"
40 : #include "utils/lsyscache.h"
41 : #include "utils/rel.h"
42 :
43 : /*
44 : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
45 : * we don't need to worry about DSM keys conflicting with plan_node_id we can
46 : * use small integers.
47 : */
48 : #define PARALLEL_VACUUM_KEY_SHARED 1
49 : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
50 : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
51 : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
52 : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
53 :
54 : /*
55 : * Shared information among parallel workers. So this is allocated in the DSM
56 : * segment.
57 : */
58 : typedef struct PVShared
59 : {
60 : /*
61 : * Target table relid, log level (for messages about parallel workers
62 : * launched during VACUUM VERBOSE) and query ID. These fields are not
63 : * modified during the parallel vacuum.
64 : */
65 : Oid relid;
66 : int elevel;
67 : int64 queryid;
68 :
69 : /*
70 : * Fields for both index vacuum and cleanup.
71 : *
72 : * reltuples is the total number of input heap tuples. We set either old
73 : * live tuples in the index vacuum case or the new live tuples in the
74 : * index cleanup case.
75 : *
76 : * estimated_count is true if reltuples is an estimated value. (Note that
77 : * reltuples could be -1 in this case, indicating we have no idea.)
78 : */
79 : double reltuples;
80 : bool estimated_count;
81 :
82 : /*
83 : * In single process vacuum we could consume more memory during index
84 : * vacuuming or cleanup apart from the memory for heap scanning. In
85 : * parallel vacuum, since individual vacuum workers can consume memory
86 : * equal to maintenance_work_mem, the new maintenance_work_mem for each
87 : * worker is set such that the parallel operation doesn't consume more
88 : * memory than single process vacuum.
89 : */
90 : int maintenance_work_mem_worker;
91 :
92 : /*
93 : * The number of buffers each worker's Buffer Access Strategy ring should
94 : * contain.
95 : */
96 : int ring_nbuffers;
97 :
98 : /*
99 : * Shared vacuum cost balance. During parallel vacuum,
100 : * VacuumSharedCostBalance points to this value and it accumulates the
101 : * balance of each parallel vacuum worker.
102 : */
103 : pg_atomic_uint32 cost_balance;
104 :
105 : /*
106 : * Number of active parallel workers. This is used for computing the
107 : * minimum threshold of the vacuum cost balance before a worker sleeps for
108 : * cost-based delay.
109 : */
110 : pg_atomic_uint32 active_nworkers;
111 :
112 : /* Counter for vacuuming and cleanup */
113 : pg_atomic_uint32 idx;
114 :
115 : /* DSA handle where the TidStore lives */
116 : dsa_handle dead_items_dsa_handle;
117 :
118 : /* DSA pointer to the shared TidStore */
119 : dsa_pointer dead_items_handle;
120 :
121 : /* Statistics of shared dead items */
122 : VacDeadItemsInfo dead_items_info;
123 : } PVShared;
124 :
125 : /* Status used during parallel index vacuum or cleanup */
126 : typedef enum PVIndVacStatus
127 : {
128 : PARALLEL_INDVAC_STATUS_INITIAL = 0,
129 : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
130 : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
131 : PARALLEL_INDVAC_STATUS_COMPLETED,
132 : } PVIndVacStatus;
133 :
134 : /*
135 : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
136 : * This includes the status of parallel index vacuum as well as index statistics.
137 : */
138 : typedef struct PVIndStats
139 : {
140 : /*
141 : * The following two fields are set by leader process before executing
142 : * parallel index vacuum or parallel index cleanup. These fields are not
143 : * fixed for the entire VACUUM operation. They are only fixed for an
144 : * individual parallel index vacuum and cleanup.
145 : *
146 : * parallel_workers_can_process is true if both leader and worker can
147 : * process the index, otherwise only leader can process it.
148 : */
149 : PVIndVacStatus status;
150 : bool parallel_workers_can_process;
151 :
152 : /*
153 : * Individual worker or leader stores the result of index vacuum or
154 : * cleanup.
155 : */
156 : bool istat_updated; /* are the stats updated? */
157 : IndexBulkDeleteResult istat;
158 : } PVIndStats;
159 :
160 : /*
161 : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
162 : */
163 : struct ParallelVacuumState
164 : {
165 : /* NULL for worker processes */
166 : ParallelContext *pcxt;
167 :
168 : /* Parent Heap Relation */
169 : Relation heaprel;
170 :
171 : /* Target indexes */
172 : Relation *indrels;
173 : int nindexes;
174 :
175 : /* Shared information among parallel vacuum workers */
176 : PVShared *shared;
177 :
178 : /*
179 : * Shared index statistics among parallel vacuum workers. The array
180 : * element is allocated for every index, even those indexes where parallel
181 : * index vacuuming is unsafe or not worthwhile (e.g.,
182 : * will_parallel_vacuum[] is false). During parallel vacuum,
183 : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
184 : * local memory at the end of parallel vacuum.
185 : */
186 : PVIndStats *indstats;
187 :
188 : /* Shared dead items space among parallel vacuum workers */
189 : TidStore *dead_items;
190 :
191 : /* Points to buffer usage area in DSM */
192 : BufferUsage *buffer_usage;
193 :
194 : /* Points to WAL usage area in DSM */
195 : WalUsage *wal_usage;
196 :
197 : /*
198 : * False if the index is totally unsuitable target for all parallel
199 : * processing. For example, the index could be <
200 : * min_parallel_index_scan_size cutoff.
201 : */
202 : bool *will_parallel_vacuum;
203 :
204 : /*
205 : * The number of indexes that support parallel index bulk-deletion and
206 : * parallel index cleanup respectively.
207 : */
208 : int nindexes_parallel_bulkdel;
209 : int nindexes_parallel_cleanup;
210 : int nindexes_parallel_condcleanup;
211 :
212 : /* Buffer access strategy used by leader process */
213 : BufferAccessStrategy bstrategy;
214 :
215 : /*
216 : * Error reporting state. The error callback is set only for workers
217 : * processes during parallel index vacuum.
218 : */
219 : char *relnamespace;
220 : char *relname;
221 : char *indname;
222 : PVIndVacStatus status;
223 : };
224 :
225 : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
226 : bool *will_parallel_vacuum);
227 : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
228 : bool vacuum, PVWorkerStats *wstats);
229 : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
230 : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
231 : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
232 : PVIndStats *indstats);
233 : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
234 : bool vacuum);
235 : static void parallel_vacuum_error_callback(void *arg);
236 :
237 : /*
238 : * Try to enter parallel mode and create a parallel context. Then initialize
239 : * shared memory state.
240 : *
241 : * On success, return parallel vacuum state. Otherwise return NULL.
242 : */
243 : ParallelVacuumState *
244 6709 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
245 : int nrequested_workers, int vac_work_mem,
246 : int elevel, BufferAccessStrategy bstrategy)
247 : {
248 : ParallelVacuumState *pvs;
249 : ParallelContext *pcxt;
250 : PVShared *shared;
251 : TidStore *dead_items;
252 : PVIndStats *indstats;
253 : BufferUsage *buffer_usage;
254 : WalUsage *wal_usage;
255 : bool *will_parallel_vacuum;
256 : Size est_indstats_len;
257 : Size est_shared_len;
258 6709 : int nindexes_mwm = 0;
259 6709 : int parallel_workers = 0;
260 : int querylen;
261 :
262 : /*
263 : * A parallel vacuum must be requested and there must be indexes on the
264 : * relation
265 : */
266 : Assert(nrequested_workers >= 0);
267 : Assert(nindexes > 0);
268 :
269 : /*
270 : * Compute the number of parallel vacuum workers to launch
271 : */
272 6709 : will_parallel_vacuum = palloc0_array(bool, nindexes);
273 6709 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
274 : nrequested_workers,
275 : will_parallel_vacuum);
276 6709 : if (parallel_workers <= 0)
277 : {
278 : /* Can't perform vacuum in parallel -- return NULL */
279 6686 : pfree(will_parallel_vacuum);
280 6686 : return NULL;
281 : }
282 :
283 23 : pvs = palloc0_object(ParallelVacuumState);
284 23 : pvs->indrels = indrels;
285 23 : pvs->nindexes = nindexes;
286 23 : pvs->will_parallel_vacuum = will_parallel_vacuum;
287 23 : pvs->bstrategy = bstrategy;
288 23 : pvs->heaprel = rel;
289 :
290 23 : EnterParallelMode();
291 23 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
292 : parallel_workers);
293 : Assert(pcxt->nworkers > 0);
294 23 : pvs->pcxt = pcxt;
295 :
296 : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
297 23 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
298 23 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
299 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
300 :
301 : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
302 23 : est_shared_len = sizeof(PVShared);
303 23 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
304 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
305 :
306 : /*
307 : * Estimate space for BufferUsage and WalUsage --
308 : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
309 : *
310 : * If there are no extensions loaded that care, we could skip this. We
311 : * have no way of knowing whether anyone's looking at pgBufferUsage or
312 : * pgWalUsage, so do it unconditionally.
313 : */
314 23 : shm_toc_estimate_chunk(&pcxt->estimator,
315 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
316 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
317 23 : shm_toc_estimate_chunk(&pcxt->estimator,
318 : mul_size(sizeof(WalUsage), pcxt->nworkers));
319 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
320 :
321 : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
322 23 : if (debug_query_string)
323 : {
324 23 : querylen = strlen(debug_query_string);
325 23 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
326 23 : shm_toc_estimate_keys(&pcxt->estimator, 1);
327 : }
328 : else
329 0 : querylen = 0; /* keep compiler quiet */
330 :
331 23 : InitializeParallelDSM(pcxt);
332 :
333 : /* Prepare index vacuum stats */
334 23 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
335 515 : MemSet(indstats, 0, est_indstats_len);
336 105 : for (int i = 0; i < nindexes; i++)
337 : {
338 82 : Relation indrel = indrels[i];
339 82 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
340 :
341 : /*
342 : * Cleanup option should be either disabled, always performing in
343 : * parallel or conditionally performing in parallel.
344 : */
345 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
346 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
347 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
348 :
349 82 : if (!will_parallel_vacuum[i])
350 4 : continue;
351 :
352 78 : if (indrel->rd_indam->amusemaintenanceworkmem)
353 8 : nindexes_mwm++;
354 :
355 : /*
356 : * Remember the number of indexes that support parallel operation for
357 : * each phase.
358 : */
359 78 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
360 70 : pvs->nindexes_parallel_bulkdel++;
361 78 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
362 16 : pvs->nindexes_parallel_cleanup++;
363 78 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
364 54 : pvs->nindexes_parallel_condcleanup++;
365 : }
366 23 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
367 23 : pvs->indstats = indstats;
368 :
369 : /* Prepare shared information */
370 23 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
371 253 : MemSet(shared, 0, est_shared_len);
372 23 : shared->relid = RelationGetRelid(rel);
373 23 : shared->elevel = elevel;
374 23 : shared->queryid = pgstat_get_my_query_id();
375 23 : shared->maintenance_work_mem_worker =
376 : (nindexes_mwm > 0) ?
377 23 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
378 : maintenance_work_mem;
379 23 : shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
380 :
381 : /* Prepare DSA space for dead items */
382 23 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
383 : LWTRANCHE_PARALLEL_VACUUM_DSA);
384 23 : pvs->dead_items = dead_items;
385 23 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
386 23 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
387 :
388 : /* Use the same buffer size for all workers */
389 23 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
390 :
391 23 : pg_atomic_init_u32(&(shared->cost_balance), 0);
392 23 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
393 23 : pg_atomic_init_u32(&(shared->idx), 0);
394 :
395 23 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
396 23 : pvs->shared = shared;
397 :
398 : /*
399 : * Allocate space for each worker's BufferUsage and WalUsage; no need to
400 : * initialize
401 : */
402 23 : buffer_usage = shm_toc_allocate(pcxt->toc,
403 23 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
404 23 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
405 23 : pvs->buffer_usage = buffer_usage;
406 23 : wal_usage = shm_toc_allocate(pcxt->toc,
407 23 : mul_size(sizeof(WalUsage), pcxt->nworkers));
408 23 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
409 23 : pvs->wal_usage = wal_usage;
410 :
411 : /* Store query string for workers */
412 23 : if (debug_query_string)
413 : {
414 : char *sharedquery;
415 :
416 23 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
417 23 : memcpy(sharedquery, debug_query_string, querylen + 1);
418 23 : sharedquery[querylen] = '\0';
419 23 : shm_toc_insert(pcxt->toc,
420 : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
421 : }
422 :
423 : /* Success -- return parallel vacuum state */
424 23 : return pvs;
425 : }
426 :
427 : /*
428 : * Destroy the parallel context, and end parallel mode.
429 : *
430 : * Since writes are not allowed during parallel mode, copy the
431 : * updated index statistics from DSM into local memory and then later use that
432 : * to update the index statistics. One might think that we can exit from
433 : * parallel mode, update the index statistics and then destroy parallel
434 : * context, but that won't be safe (see ExitParallelMode).
435 : */
436 : void
437 23 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
438 : {
439 : Assert(!IsParallelWorker());
440 :
441 : /* Copy the updated statistics */
442 105 : for (int i = 0; i < pvs->nindexes; i++)
443 : {
444 82 : PVIndStats *indstats = &(pvs->indstats[i]);
445 :
446 82 : if (indstats->istat_updated)
447 : {
448 49 : istats[i] = palloc0_object(IndexBulkDeleteResult);
449 49 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
450 : }
451 : else
452 33 : istats[i] = NULL;
453 : }
454 :
455 23 : TidStoreDestroy(pvs->dead_items);
456 :
457 23 : DestroyParallelContext(pvs->pcxt);
458 23 : ExitParallelMode();
459 :
460 23 : pfree(pvs->will_parallel_vacuum);
461 23 : pfree(pvs);
462 23 : }
463 :
464 : /*
465 : * Returns the dead items space and dead items information.
466 : */
467 : TidStore *
468 38 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
469 : {
470 38 : *dead_items_info_p = &(pvs->shared->dead_items_info);
471 38 : return pvs->dead_items;
472 : }
473 :
474 : /* Forget all items in dead_items */
475 : void
476 15 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
477 : {
478 15 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479 :
480 : /*
481 : * Free the current tidstore and return allocated DSA segments to the
482 : * operating system. Then we recreate the tidstore with the same max_bytes
483 : * limitation we just used.
484 : */
485 15 : TidStoreDestroy(pvs->dead_items);
486 15 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
487 : LWTRANCHE_PARALLEL_VACUUM_DSA);
488 :
489 : /* Update the DSA pointer for dead_items to the new one */
490 15 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
491 15 : pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
492 :
493 : /* Reset the counter */
494 15 : dead_items_info->num_items = 0;
495 15 : }
496 :
497 : /*
498 : * Do parallel index bulk-deletion with parallel workers.
499 : */
500 : void
501 15 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
502 : int num_index_scans, PVWorkerStats *wstats)
503 : {
504 : Assert(!IsParallelWorker());
505 :
506 : /*
507 : * We can only provide an approximate value of num_heap_tuples, at least
508 : * for now.
509 : */
510 15 : pvs->shared->reltuples = num_table_tuples;
511 15 : pvs->shared->estimated_count = true;
512 :
513 15 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
514 15 : }
515 :
516 : /*
517 : * Do parallel index cleanup with parallel workers.
518 : */
519 : void
520 23 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
521 : int num_index_scans, bool estimated_count,
522 : PVWorkerStats *wstats)
523 : {
524 : Assert(!IsParallelWorker());
525 :
526 : /*
527 : * We can provide a better estimate of total number of surviving tuples
528 : * (we assume indexes are more interested in that than in the number of
529 : * nominally live tuples).
530 : */
531 23 : pvs->shared->reltuples = num_table_tuples;
532 23 : pvs->shared->estimated_count = estimated_count;
533 :
534 23 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
535 23 : }
536 :
537 : /*
538 : * Compute the number of parallel worker processes to request. Both index
539 : * vacuum and index cleanup can be executed with parallel workers.
540 : * The index is eligible for parallel vacuum iff its size is greater than
541 : * min_parallel_index_scan_size as invoking workers for very small indexes
542 : * can hurt performance.
543 : *
544 : * nrequested is the number of parallel workers that user requested. If
545 : * nrequested is 0, we compute the parallel degree based on nindexes, that is
546 : * the number of indexes that support parallel vacuum. This function also
547 : * sets will_parallel_vacuum to remember indexes that participate in parallel
548 : * vacuum.
549 : */
550 : static int
551 6709 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
552 : bool *will_parallel_vacuum)
553 : {
554 6709 : int nindexes_parallel = 0;
555 6709 : int nindexes_parallel_bulkdel = 0;
556 6709 : int nindexes_parallel_cleanup = 0;
557 : int parallel_workers;
558 :
559 : /*
560 : * We don't allow performing parallel operation in standalone backend or
561 : * when parallelism is disabled.
562 : */
563 6709 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
564 2970 : return 0;
565 :
566 : /*
567 : * Compute the number of indexes that can participate in parallel vacuum.
568 : */
569 12178 : for (int i = 0; i < nindexes; i++)
570 : {
571 8439 : Relation indrel = indrels[i];
572 8439 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
573 :
574 : /* Skip index that is not a suitable target for parallel index vacuum */
575 8439 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
576 8439 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
577 8349 : continue;
578 :
579 90 : will_parallel_vacuum[i] = true;
580 :
581 90 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
582 82 : nindexes_parallel_bulkdel++;
583 90 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
584 74 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
585 82 : nindexes_parallel_cleanup++;
586 : }
587 :
588 3739 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
589 : nindexes_parallel_cleanup);
590 :
591 : /* The leader process takes one index */
592 3739 : nindexes_parallel--;
593 :
594 : /* No index supports parallel vacuum */
595 3739 : if (nindexes_parallel <= 0)
596 3716 : return 0;
597 :
598 : /* Compute the parallel degree */
599 23 : parallel_workers = (nrequested > 0) ?
600 23 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
601 :
602 : /* Cap by max_parallel_maintenance_workers */
603 23 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
604 :
605 23 : return parallel_workers;
606 : }
607 :
608 : /*
609 : * Perform index vacuum or index cleanup with parallel workers. This function
610 : * must be used by the parallel vacuum leader process.
611 : *
612 : * If wstats is not NULL, the parallel worker statistics are updated.
613 : */
614 : static void
615 38 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
616 : bool vacuum, PVWorkerStats *wstats)
617 : {
618 : int nworkers;
619 : PVIndVacStatus new_status;
620 :
621 : Assert(!IsParallelWorker());
622 :
623 38 : if (vacuum)
624 : {
625 15 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
626 :
627 : /* Determine the number of parallel workers to launch */
628 15 : nworkers = pvs->nindexes_parallel_bulkdel;
629 : }
630 : else
631 : {
632 23 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
633 :
634 : /* Determine the number of parallel workers to launch */
635 23 : nworkers = pvs->nindexes_parallel_cleanup;
636 :
637 : /* Add conditionally parallel-aware indexes if in the first time call */
638 23 : if (num_index_scans == 0)
639 16 : nworkers += pvs->nindexes_parallel_condcleanup;
640 : }
641 :
642 : /* The leader process will participate */
643 38 : nworkers--;
644 :
645 : /*
646 : * It is possible that parallel context is initialized with fewer workers
647 : * than the number of indexes that need a separate worker in the current
648 : * phase, so we need to consider it. See
649 : * parallel_vacuum_compute_workers().
650 : */
651 38 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
652 :
653 : /* Update the statistics, if we asked to */
654 38 : if (wstats != NULL && nworkers > 0)
655 31 : wstats->nplanned += nworkers;
656 :
657 : /*
658 : * Set index vacuum status and mark whether parallel vacuum worker can
659 : * process it.
660 : */
661 153 : for (int i = 0; i < pvs->nindexes; i++)
662 : {
663 115 : PVIndStats *indstats = &(pvs->indstats[i]);
664 :
665 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
666 115 : indstats->status = new_status;
667 115 : indstats->parallel_workers_can_process =
668 223 : (pvs->will_parallel_vacuum[i] &&
669 108 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
670 : num_index_scans,
671 115 : vacuum));
672 : }
673 :
674 : /* Reset the parallel index processing and progress counters */
675 38 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
676 :
677 : /* Setup the shared cost-based vacuum delay and launch workers */
678 38 : if (nworkers > 0)
679 : {
680 : /* Reinitialize parallel context to relaunch parallel workers */
681 31 : if (num_index_scans > 0)
682 8 : ReinitializeParallelDSM(pvs->pcxt);
683 :
684 : /*
685 : * Set up shared cost balance and the number of active workers for
686 : * vacuum delay. We need to do this before launching workers as
687 : * otherwise, they might not see the updated values for these
688 : * parameters.
689 : */
690 31 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
691 31 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
692 :
693 : /*
694 : * The number of workers can vary between bulkdelete and cleanup
695 : * phase.
696 : */
697 31 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
698 :
699 31 : LaunchParallelWorkers(pvs->pcxt);
700 :
701 31 : if (pvs->pcxt->nworkers_launched > 0)
702 : {
703 : /*
704 : * Reset the local cost values for leader backend as we have
705 : * already accumulated the remaining balance of heap.
706 : */
707 31 : VacuumCostBalance = 0;
708 31 : VacuumCostBalanceLocal = 0;
709 :
710 : /* Enable shared cost balance for leader backend */
711 31 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
712 31 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
713 :
714 : /* Update the statistics, if we asked to */
715 31 : if (wstats != NULL)
716 31 : wstats->nlaunched += pvs->pcxt->nworkers_launched;
717 : }
718 :
719 31 : if (vacuum)
720 15 : ereport(pvs->shared->elevel,
721 : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
722 : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
723 : pvs->pcxt->nworkers_launched),
724 : pvs->pcxt->nworkers_launched, nworkers)));
725 : else
726 16 : ereport(pvs->shared->elevel,
727 : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
728 : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
729 : pvs->pcxt->nworkers_launched),
730 : pvs->pcxt->nworkers_launched, nworkers)));
731 : }
732 :
733 : /* Vacuum the indexes that can be processed by only leader process */
734 38 : parallel_vacuum_process_unsafe_indexes(pvs);
735 :
736 : /*
737 : * Join as a parallel worker. The leader vacuums alone processes all
738 : * parallel-safe indexes in the case where no workers are launched.
739 : */
740 38 : parallel_vacuum_process_safe_indexes(pvs);
741 :
742 : /*
743 : * Next, accumulate buffer and WAL usage. (This must wait for the workers
744 : * to finish, or we might get incomplete data.)
745 : */
746 38 : if (nworkers > 0)
747 : {
748 : /* Wait for all vacuum workers to finish */
749 31 : WaitForParallelWorkersToFinish(pvs->pcxt);
750 :
751 70 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
752 39 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
753 : }
754 :
755 : /*
756 : * Reset all index status back to initial (while checking that we have
757 : * vacuumed all indexes).
758 : */
759 153 : for (int i = 0; i < pvs->nindexes; i++)
760 : {
761 115 : PVIndStats *indstats = &(pvs->indstats[i]);
762 :
763 115 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
764 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
765 : RelationGetRelationName(pvs->indrels[i]));
766 :
767 115 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
768 : }
769 :
770 : /*
771 : * Carry the shared balance value to heap scan and disable shared costing
772 : */
773 38 : if (VacuumSharedCostBalance)
774 : {
775 31 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
776 31 : VacuumSharedCostBalance = NULL;
777 31 : VacuumActiveNWorkers = NULL;
778 : }
779 38 : }
780 :
781 : /*
782 : * Index vacuum/cleanup routine used by the leader process and parallel
783 : * vacuum worker processes to vacuum the indexes in parallel.
784 : */
785 : static void
786 77 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
787 : {
788 : /*
789 : * Increment the active worker count if we are able to launch any worker.
790 : */
791 77 : if (VacuumActiveNWorkers)
792 70 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
793 :
794 : /* Loop until all indexes are vacuumed */
795 : for (;;)
796 115 : {
797 : int idx;
798 : PVIndStats *indstats;
799 :
800 : /* Get an index number to process */
801 192 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
802 :
803 : /* Done for all indexes? */
804 192 : if (idx >= pvs->nindexes)
805 77 : break;
806 :
807 115 : indstats = &(pvs->indstats[idx]);
808 :
809 : /*
810 : * Skip vacuuming index that is unsafe for workers or has an
811 : * unsuitable target for parallel index vacuum (this is vacuumed in
812 : * parallel_vacuum_process_unsafe_indexes() by the leader).
813 : */
814 115 : if (!indstats->parallel_workers_can_process)
815 29 : continue;
816 :
817 : /* Do vacuum or cleanup of the index */
818 86 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
819 : }
820 :
821 : /*
822 : * We have completed the index vacuum so decrement the active worker
823 : * count.
824 : */
825 77 : if (VacuumActiveNWorkers)
826 70 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
827 77 : }
828 :
829 : /*
830 : * Perform parallel vacuuming of indexes in leader process.
831 : *
832 : * Handles index vacuuming (or index cleanup) for indexes that are not
833 : * parallel safe. It's possible that this will vary for a given index, based
834 : * on details like whether we're performing index cleanup right now.
835 : *
836 : * Also performs vacuuming of smaller indexes that fell under the size cutoff
837 : * enforced by parallel_vacuum_compute_workers().
838 : */
839 : static void
840 38 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
841 : {
842 : Assert(!IsParallelWorker());
843 :
844 : /*
845 : * Increment the active worker count if we are able to launch any worker.
846 : */
847 38 : if (VacuumActiveNWorkers)
848 31 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
849 :
850 153 : for (int i = 0; i < pvs->nindexes; i++)
851 : {
852 115 : PVIndStats *indstats = &(pvs->indstats[i]);
853 :
854 : /* Skip, indexes that are safe for workers */
855 115 : if (indstats->parallel_workers_can_process)
856 86 : continue;
857 :
858 : /* Do vacuum or cleanup of the index */
859 29 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
860 : }
861 :
862 : /*
863 : * We have completed the index vacuum so decrement the active worker
864 : * count.
865 : */
866 38 : if (VacuumActiveNWorkers)
867 31 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
868 38 : }
869 :
870 : /*
871 : * Vacuum or cleanup index either by leader process or by one of the worker
872 : * process. After vacuuming the index this function copies the index
873 : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
874 : * segment.
875 : */
876 : static void
877 115 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
878 : PVIndStats *indstats)
879 : {
880 115 : IndexBulkDeleteResult *istat = NULL;
881 : IndexBulkDeleteResult *istat_res;
882 : IndexVacuumInfo ivinfo;
883 :
884 : /*
885 : * Update the pointer to the corresponding bulk-deletion result if someone
886 : * has already updated it
887 : */
888 115 : if (indstats->istat_updated)
889 33 : istat = &(indstats->istat);
890 :
891 115 : ivinfo.index = indrel;
892 115 : ivinfo.heaprel = pvs->heaprel;
893 115 : ivinfo.analyze_only = false;
894 115 : ivinfo.report_progress = false;
895 115 : ivinfo.message_level = DEBUG2;
896 115 : ivinfo.estimated_count = pvs->shared->estimated_count;
897 115 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
898 115 : ivinfo.strategy = pvs->bstrategy;
899 :
900 : /* Update error traceback information */
901 115 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
902 115 : pvs->status = indstats->status;
903 :
904 115 : switch (indstats->status)
905 : {
906 33 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
907 33 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
908 33 : &pvs->shared->dead_items_info);
909 33 : break;
910 82 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
911 82 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
912 82 : break;
913 0 : default:
914 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
915 : indstats->status,
916 : RelationGetRelationName(indrel));
917 : }
918 :
919 : /*
920 : * Copy the index bulk-deletion result returned from ambulkdelete and
921 : * amvacuumcleanup to the DSM segment if it's the first cycle because they
922 : * allocate locally and it's possible that an index will be vacuumed by a
923 : * different vacuum process the next cycle. Copying the result normally
924 : * happens only the first time an index is vacuumed. For any additional
925 : * vacuum pass, we directly point to the result on the DSM segment and
926 : * pass it to vacuum index APIs so that workers can update it directly.
927 : *
928 : * Since all vacuum workers write the bulk-deletion result at different
929 : * slots we can write them without locking.
930 : */
931 115 : if (!indstats->istat_updated && istat_res != NULL)
932 : {
933 49 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
934 49 : indstats->istat_updated = true;
935 :
936 : /* Free the locally-allocated bulk-deletion result */
937 49 : pfree(istat_res);
938 : }
939 :
940 : /*
941 : * Update the status to completed. No need to lock here since each worker
942 : * touches different indexes.
943 : */
944 115 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
945 :
946 : /* Reset error traceback information */
947 115 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
948 115 : pfree(pvs->indname);
949 115 : pvs->indname = NULL;
950 :
951 : /*
952 : * Call the parallel variant of pgstat_progress_incr_param so workers can
953 : * report progress of index vacuum to the leader.
954 : */
955 115 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
956 115 : }
957 :
958 : /*
959 : * Returns false, if the given index can't participate in the next execution of
960 : * parallel index vacuum or parallel index cleanup.
961 : */
962 : static bool
963 108 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
964 : bool vacuum)
965 : {
966 : uint8 vacoptions;
967 :
968 108 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
969 :
970 : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
971 108 : if (vacuum)
972 30 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
973 :
974 : /* Not safe, if the index does not support parallel cleanup */
975 78 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
976 62 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
977 8 : return false;
978 :
979 : /*
980 : * Not safe, if the index supports parallel cleanup conditionally, but we
981 : * have already processed the index (for bulkdelete). We do this to avoid
982 : * the need to invoke workers when parallel index cleanup doesn't need to
983 : * scan the index. See the comments for option
984 : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
985 : * parallel cleanup conditionally.
986 : */
987 70 : if (num_index_scans > 0 &&
988 14 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
989 14 : return false;
990 :
991 56 : return true;
992 : }
993 :
994 : /*
995 : * Perform work within a launched parallel process.
996 : *
997 : * Since parallel vacuum workers perform only index vacuum or index cleanup,
998 : * we don't need to report progress information.
999 : */
1000 : void
1001 39 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
1002 : {
1003 : ParallelVacuumState pvs;
1004 : Relation rel;
1005 : Relation *indrels;
1006 : PVIndStats *indstats;
1007 : PVShared *shared;
1008 : TidStore *dead_items;
1009 : BufferUsage *buffer_usage;
1010 : WalUsage *wal_usage;
1011 : int nindexes;
1012 : char *sharedquery;
1013 : ErrorContextCallback errcallback;
1014 :
1015 : /*
1016 : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1017 : * don't support parallel vacuum for autovacuum as of now.
1018 : */
1019 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1020 :
1021 39 : elog(DEBUG1, "starting parallel vacuum worker");
1022 :
1023 39 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1024 :
1025 : /* Set debug_query_string for individual workers */
1026 39 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1027 39 : debug_query_string = sharedquery;
1028 39 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1029 :
1030 : /* Track query ID */
1031 39 : pgstat_report_query_id(shared->queryid, false);
1032 :
1033 : /*
1034 : * Open table. The lock mode is the same as the leader process. It's
1035 : * okay because the lock mode does not conflict among the parallel
1036 : * workers.
1037 : */
1038 39 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1039 :
1040 : /*
1041 : * Open all indexes. indrels are sorted in order by OID, which should be
1042 : * matched to the leader's one.
1043 : */
1044 39 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1045 : Assert(nindexes > 0);
1046 :
1047 : /*
1048 : * Apply the desired value of maintenance_work_mem within this process.
1049 : * Really we should use SetConfigOption() to change a GUC, but since we're
1050 : * already in parallel mode guc.c would complain about that. Fortunately,
1051 : * by the same token guc.c will not let any user-defined code change it.
1052 : * So just avert your eyes while we do this:
1053 : */
1054 39 : if (shared->maintenance_work_mem_worker > 0)
1055 39 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1056 :
1057 : /* Set index statistics */
1058 39 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1059 : PARALLEL_VACUUM_KEY_INDEX_STATS,
1060 : false);
1061 :
1062 : /* Find dead_items in shared memory */
1063 39 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1064 : shared->dead_items_handle);
1065 :
1066 : /* Set cost-based vacuum delay */
1067 39 : VacuumUpdateCosts();
1068 39 : VacuumCostBalance = 0;
1069 39 : VacuumCostBalanceLocal = 0;
1070 39 : VacuumSharedCostBalance = &(shared->cost_balance);
1071 39 : VacuumActiveNWorkers = &(shared->active_nworkers);
1072 :
1073 : /* Set parallel vacuum state */
1074 39 : pvs.indrels = indrels;
1075 39 : pvs.nindexes = nindexes;
1076 39 : pvs.indstats = indstats;
1077 39 : pvs.shared = shared;
1078 39 : pvs.dead_items = dead_items;
1079 39 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1080 39 : pvs.relname = pstrdup(RelationGetRelationName(rel));
1081 39 : pvs.heaprel = rel;
1082 :
1083 : /* These fields will be filled during index vacuum or cleanup */
1084 39 : pvs.indname = NULL;
1085 39 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1086 :
1087 : /* Each parallel VACUUM worker gets its own access strategy. */
1088 78 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1089 39 : shared->ring_nbuffers * (BLCKSZ / 1024));
1090 :
1091 : /* Setup error traceback support for ereport() */
1092 39 : errcallback.callback = parallel_vacuum_error_callback;
1093 39 : errcallback.arg = &pvs;
1094 39 : errcallback.previous = error_context_stack;
1095 39 : error_context_stack = &errcallback;
1096 :
1097 : /* Prepare to track buffer usage during parallel execution */
1098 39 : InstrStartParallelQuery();
1099 :
1100 : /* Process indexes to perform vacuum/cleanup */
1101 39 : parallel_vacuum_process_safe_indexes(&pvs);
1102 :
1103 : /* Report buffer/WAL usage during parallel execution */
1104 39 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1105 39 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1106 39 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1107 39 : &wal_usage[ParallelWorkerNumber]);
1108 :
1109 : /* Report any remaining cost-based vacuum delay time */
1110 39 : if (track_cost_delay_timing)
1111 0 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
1112 : parallel_vacuum_worker_delay_ns);
1113 :
1114 39 : TidStoreDetach(dead_items);
1115 :
1116 : /* Pop the error context stack */
1117 39 : error_context_stack = errcallback.previous;
1118 :
1119 39 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1120 39 : table_close(rel, ShareUpdateExclusiveLock);
1121 39 : FreeAccessStrategy(pvs.bstrategy);
1122 39 : }
1123 :
1124 : /*
1125 : * Error context callback for errors occurring during parallel index vacuum.
1126 : * The error context messages should match the messages set in the lazy vacuum
1127 : * error context. If you change this function, change vacuum_error_callback()
1128 : * as well.
1129 : */
1130 : static void
1131 0 : parallel_vacuum_error_callback(void *arg)
1132 : {
1133 0 : ParallelVacuumState *errinfo = arg;
1134 :
1135 0 : switch (errinfo->status)
1136 : {
1137 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1138 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1139 : errinfo->indname,
1140 : errinfo->relnamespace,
1141 : errinfo->relname);
1142 0 : break;
1143 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1144 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1145 : errinfo->indname,
1146 : errinfo->relnamespace,
1147 : errinfo->relname);
1148 0 : break;
1149 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1150 : case PARALLEL_INDVAC_STATUS_COMPLETED:
1151 : default:
1152 0 : return;
1153 : }
1154 : }
|