Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * vacuumparallel.c
4 : * Support routines for parallel vacuum execution.
5 : *
6 : * This file contains routines that are intended to support setting up, using,
7 : * and tearing down a ParallelVacuumState.
8 : *
9 : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : * with parallel worker processes. Individual indexes are processed by one
11 : * vacuum process. ParallelVacuumState contains shared information as well as
12 : * the memory space for storing dead items allocated in the DSA area. We
13 : * launch parallel worker processes at the start of parallel index
14 : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : * parallel worker processes exit. Each time we process indexes in parallel,
16 : * the parallel context is re-initialized so that the same DSM can be used for
17 : * multiple passes of index bulk-deletion and index cleanup.
18 : *
19 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/commands/vacuumparallel.c
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/amapi.h"
30 : #include "access/table.h"
31 : #include "access/xact.h"
32 : #include "commands/progress.h"
33 : #include "commands/vacuum.h"
34 : #include "executor/instrument.h"
35 : #include "optimizer/paths.h"
36 : #include "pgstat.h"
37 : #include "storage/bufmgr.h"
38 : #include "tcop/tcopprot.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 :
42 : /*
43 : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44 : * we don't need to worry about DSM keys conflicting with plan_node_id we can
45 : * use small integers.
46 : */
47 : #define PARALLEL_VACUUM_KEY_SHARED 1
48 : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
52 :
53 : /*
54 : * Shared information among parallel workers. So this is allocated in the DSM
55 : * segment.
56 : */
57 : typedef struct PVShared
58 : {
59 : /*
60 : * Target table relid, log level (for messages about parallel workers
61 : * launched during VACUUM VERBOSE) and query ID. These fields are not
62 : * modified during the parallel vacuum.
63 : */
64 : Oid relid;
65 : int elevel;
66 : uint64 queryid;
67 :
68 : /*
69 : * Fields for both index vacuum and cleanup.
70 : *
71 : * reltuples is the total number of input heap tuples. We set either old
72 : * live tuples in the index vacuum case or the new live tuples in the
73 : * index cleanup case.
74 : *
75 : * estimated_count is true if reltuples is an estimated value. (Note that
76 : * reltuples could be -1 in this case, indicating we have no idea.)
77 : */
78 : double reltuples;
79 : bool estimated_count;
80 :
81 : /*
82 : * In single process vacuum we could consume more memory during index
83 : * vacuuming or cleanup apart from the memory for heap scanning. In
84 : * parallel vacuum, since individual vacuum workers can consume memory
85 : * equal to maintenance_work_mem, the new maintenance_work_mem for each
86 : * worker is set such that the parallel operation doesn't consume more
87 : * memory than single process vacuum.
88 : */
89 : int maintenance_work_mem_worker;
90 :
91 : /*
92 : * The number of buffers each worker's Buffer Access Strategy ring should
93 : * contain.
94 : */
95 : int ring_nbuffers;
96 :
97 : /*
98 : * Shared vacuum cost balance. During parallel vacuum,
99 : * VacuumSharedCostBalance points to this value and it accumulates the
100 : * balance of each parallel vacuum worker.
101 : */
102 : pg_atomic_uint32 cost_balance;
103 :
104 : /*
105 : * Number of active parallel workers. This is used for computing the
106 : * minimum threshold of the vacuum cost balance before a worker sleeps for
107 : * cost-based delay.
108 : */
109 : pg_atomic_uint32 active_nworkers;
110 :
111 : /* Counter for vacuuming and cleanup */
112 : pg_atomic_uint32 idx;
113 :
114 : /* DSA handle where the TidStore lives */
115 : dsa_handle dead_items_dsa_handle;
116 :
117 : /* DSA pointer to the shared TidStore */
118 : dsa_pointer dead_items_handle;
119 :
120 : /* Statistics of shared dead items */
121 : VacDeadItemsInfo dead_items_info;
122 : } PVShared;
123 :
124 : /* Status used during parallel index vacuum or cleanup */
125 : typedef enum PVIndVacStatus
126 : {
127 : PARALLEL_INDVAC_STATUS_INITIAL = 0,
128 : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
129 : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
130 : PARALLEL_INDVAC_STATUS_COMPLETED,
131 : } PVIndVacStatus;
132 :
133 : /*
134 : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
135 : * This includes the status of parallel index vacuum as well as index statistics.
136 : */
137 : typedef struct PVIndStats
138 : {
139 : /*
140 : * The following two fields are set by leader process before executing
141 : * parallel index vacuum or parallel index cleanup. These fields are not
142 : * fixed for the entire VACUUM operation. They are only fixed for an
143 : * individual parallel index vacuum and cleanup.
144 : *
145 : * parallel_workers_can_process is true if both leader and worker can
146 : * process the index, otherwise only leader can process it.
147 : */
148 : PVIndVacStatus status;
149 : bool parallel_workers_can_process;
150 :
151 : /*
152 : * Individual worker or leader stores the result of index vacuum or
153 : * cleanup.
154 : */
155 : bool istat_updated; /* are the stats updated? */
156 : IndexBulkDeleteResult istat;
157 : } PVIndStats;
158 :
159 : /*
160 : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
161 : */
162 : struct ParallelVacuumState
163 : {
164 : /* NULL for worker processes */
165 : ParallelContext *pcxt;
166 :
167 : /* Parent Heap Relation */
168 : Relation heaprel;
169 :
170 : /* Target indexes */
171 : Relation *indrels;
172 : int nindexes;
173 :
174 : /* Shared information among parallel vacuum workers */
175 : PVShared *shared;
176 :
177 : /*
178 : * Shared index statistics among parallel vacuum workers. The array
179 : * element is allocated for every index, even those indexes where parallel
180 : * index vacuuming is unsafe or not worthwhile (e.g.,
181 : * will_parallel_vacuum[] is false). During parallel vacuum,
182 : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
183 : * local memory at the end of parallel vacuum.
184 : */
185 : PVIndStats *indstats;
186 :
187 : /* Shared dead items space among parallel vacuum workers */
188 : TidStore *dead_items;
189 :
190 : /* Points to buffer usage area in DSM */
191 : BufferUsage *buffer_usage;
192 :
193 : /* Points to WAL usage area in DSM */
194 : WalUsage *wal_usage;
195 :
196 : /*
197 : * False if the index is totally unsuitable target for all parallel
198 : * processing. For example, the index could be <
199 : * min_parallel_index_scan_size cutoff.
200 : */
201 : bool *will_parallel_vacuum;
202 :
203 : /*
204 : * The number of indexes that support parallel index bulk-deletion and
205 : * parallel index cleanup respectively.
206 : */
207 : int nindexes_parallel_bulkdel;
208 : int nindexes_parallel_cleanup;
209 : int nindexes_parallel_condcleanup;
210 :
211 : /* Buffer access strategy used by leader process */
212 : BufferAccessStrategy bstrategy;
213 :
214 : /*
215 : * Error reporting state. The error callback is set only for workers
216 : * processes during parallel index vacuum.
217 : */
218 : char *relnamespace;
219 : char *relname;
220 : char *indname;
221 : PVIndVacStatus status;
222 : };
223 :
224 : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
225 : bool *will_parallel_vacuum);
226 : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
227 : bool vacuum);
228 : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
229 : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
230 : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
231 : PVIndStats *indstats);
232 : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
233 : bool vacuum);
234 : static void parallel_vacuum_error_callback(void *arg);
235 :
236 : /*
237 : * Try to enter parallel mode and create a parallel context. Then initialize
238 : * shared memory state.
239 : *
240 : * On success, return parallel vacuum state. Otherwise return NULL.
241 : */
242 : ParallelVacuumState *
243 7934 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
244 : int nrequested_workers, int vac_work_mem,
245 : int elevel, BufferAccessStrategy bstrategy)
246 : {
247 : ParallelVacuumState *pvs;
248 : ParallelContext *pcxt;
249 : PVShared *shared;
250 : TidStore *dead_items;
251 : PVIndStats *indstats;
252 : BufferUsage *buffer_usage;
253 : WalUsage *wal_usage;
254 : bool *will_parallel_vacuum;
255 : Size est_indstats_len;
256 : Size est_shared_len;
257 7934 : int nindexes_mwm = 0;
258 7934 : int parallel_workers = 0;
259 : int querylen;
260 :
261 : /*
262 : * A parallel vacuum must be requested and there must be indexes on the
263 : * relation
264 : */
265 : Assert(nrequested_workers >= 0);
266 : Assert(nindexes > 0);
267 :
268 : /*
269 : * Compute the number of parallel vacuum workers to launch
270 : */
271 7934 : will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
272 7934 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
273 : nrequested_workers,
274 : will_parallel_vacuum);
275 7934 : if (parallel_workers <= 0)
276 : {
277 : /* Can't perform vacuum in parallel -- return NULL */
278 7912 : pfree(will_parallel_vacuum);
279 7912 : return NULL;
280 : }
281 :
282 22 : pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
283 22 : pvs->indrels = indrels;
284 22 : pvs->nindexes = nindexes;
285 22 : pvs->will_parallel_vacuum = will_parallel_vacuum;
286 22 : pvs->bstrategy = bstrategy;
287 22 : pvs->heaprel = rel;
288 :
289 22 : EnterParallelMode();
290 22 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
291 : parallel_workers);
292 : Assert(pcxt->nworkers > 0);
293 22 : pvs->pcxt = pcxt;
294 :
295 : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
296 22 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
297 22 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
298 22 : shm_toc_estimate_keys(&pcxt->estimator, 1);
299 :
300 : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
301 22 : est_shared_len = sizeof(PVShared);
302 22 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
303 22 : shm_toc_estimate_keys(&pcxt->estimator, 1);
304 :
305 : /*
306 : * Estimate space for BufferUsage and WalUsage --
307 : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
308 : *
309 : * If there are no extensions loaded that care, we could skip this. We
310 : * have no way of knowing whether anyone's looking at pgBufferUsage or
311 : * pgWalUsage, so do it unconditionally.
312 : */
313 22 : shm_toc_estimate_chunk(&pcxt->estimator,
314 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
315 22 : shm_toc_estimate_keys(&pcxt->estimator, 1);
316 22 : shm_toc_estimate_chunk(&pcxt->estimator,
317 : mul_size(sizeof(WalUsage), pcxt->nworkers));
318 22 : shm_toc_estimate_keys(&pcxt->estimator, 1);
319 :
320 : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
321 22 : if (debug_query_string)
322 : {
323 22 : querylen = strlen(debug_query_string);
324 22 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
325 22 : shm_toc_estimate_keys(&pcxt->estimator, 1);
326 : }
327 : else
328 0 : querylen = 0; /* keep compiler quiet */
329 :
330 22 : InitializeParallelDSM(pcxt);
331 :
332 : /* Prepare index vacuum stats */
333 22 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
334 610 : MemSet(indstats, 0, est_indstats_len);
335 120 : for (int i = 0; i < nindexes; i++)
336 : {
337 98 : Relation indrel = indrels[i];
338 98 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
339 :
340 : /*
341 : * Cleanup option should be either disabled, always performing in
342 : * parallel or conditionally performing in parallel.
343 : */
344 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
345 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
346 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
347 :
348 98 : if (!will_parallel_vacuum[i])
349 6 : continue;
350 :
351 92 : if (indrel->rd_indam->amusemaintenanceworkmem)
352 12 : nindexes_mwm++;
353 :
354 : /*
355 : * Remember the number of indexes that support parallel operation for
356 : * each phase.
357 : */
358 92 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
359 80 : pvs->nindexes_parallel_bulkdel++;
360 92 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
361 24 : pvs->nindexes_parallel_cleanup++;
362 92 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
363 56 : pvs->nindexes_parallel_condcleanup++;
364 : }
365 22 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
366 22 : pvs->indstats = indstats;
367 :
368 : /* Prepare shared information */
369 22 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
370 242 : MemSet(shared, 0, est_shared_len);
371 22 : shared->relid = RelationGetRelid(rel);
372 22 : shared->elevel = elevel;
373 22 : shared->queryid = pgstat_get_my_query_id();
374 22 : shared->maintenance_work_mem_worker =
375 : (nindexes_mwm > 0) ?
376 22 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
377 : maintenance_work_mem;
378 22 : shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
379 :
380 : /* Prepare DSA space for dead items */
381 22 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
382 : LWTRANCHE_PARALLEL_VACUUM_DSA);
383 22 : pvs->dead_items = dead_items;
384 22 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
385 22 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
386 :
387 : /* Use the same buffer size for all workers */
388 22 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
389 :
390 22 : pg_atomic_init_u32(&(shared->cost_balance), 0);
391 22 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
392 22 : pg_atomic_init_u32(&(shared->idx), 0);
393 :
394 22 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
395 22 : pvs->shared = shared;
396 :
397 : /*
398 : * Allocate space for each worker's BufferUsage and WalUsage; no need to
399 : * initialize
400 : */
401 22 : buffer_usage = shm_toc_allocate(pcxt->toc,
402 22 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
403 22 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
404 22 : pvs->buffer_usage = buffer_usage;
405 22 : wal_usage = shm_toc_allocate(pcxt->toc,
406 22 : mul_size(sizeof(WalUsage), pcxt->nworkers));
407 22 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
408 22 : pvs->wal_usage = wal_usage;
409 :
410 : /* Store query string for workers */
411 22 : if (debug_query_string)
412 : {
413 : char *sharedquery;
414 :
415 22 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
416 22 : memcpy(sharedquery, debug_query_string, querylen + 1);
417 22 : sharedquery[querylen] = '\0';
418 22 : shm_toc_insert(pcxt->toc,
419 : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
420 : }
421 :
422 : /* Success -- return parallel vacuum state */
423 22 : return pvs;
424 : }
425 :
426 : /*
427 : * Destroy the parallel context, and end parallel mode.
428 : *
429 : * Since writes are not allowed during parallel mode, copy the
430 : * updated index statistics from DSM into local memory and then later use that
431 : * to update the index statistics. One might think that we can exit from
432 : * parallel mode, update the index statistics and then destroy parallel
433 : * context, but that won't be safe (see ExitParallelMode).
434 : */
435 : void
436 22 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
437 : {
438 : Assert(!IsParallelWorker());
439 :
440 : /* Copy the updated statistics */
441 120 : for (int i = 0; i < pvs->nindexes; i++)
442 : {
443 98 : PVIndStats *indstats = &(pvs->indstats[i]);
444 :
445 98 : if (indstats->istat_updated)
446 : {
447 82 : istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
448 82 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
449 : }
450 : else
451 16 : istats[i] = NULL;
452 : }
453 :
454 22 : TidStoreDestroy(pvs->dead_items);
455 :
456 22 : DestroyParallelContext(pvs->pcxt);
457 22 : ExitParallelMode();
458 :
459 22 : pfree(pvs->will_parallel_vacuum);
460 22 : pfree(pvs);
461 22 : }
462 :
463 : /*
464 : * Returns the dead items space and dead items information.
465 : */
466 : TidStore *
467 22 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
468 : {
469 22 : *dead_items_info_p = &(pvs->shared->dead_items_info);
470 22 : return pvs->dead_items;
471 : }
472 :
473 : /* Forget all items in dead_items */
474 : void
475 14 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
476 : {
477 14 : TidStore *dead_items = pvs->dead_items;
478 14 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479 :
480 : /*
481 : * Free the current tidstore and return allocated DSA segments to the
482 : * operating system. Then we recreate the tidstore with the same max_bytes
483 : * limitation we just used.
484 : */
485 14 : TidStoreDestroy(dead_items);
486 14 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
487 : LWTRANCHE_PARALLEL_VACUUM_DSA);
488 :
489 : /* Update the DSA pointer for dead_items to the new one */
490 14 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
491 14 : pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
492 :
493 : /* Reset the counter */
494 14 : dead_items_info->num_items = 0;
495 14 : }
496 :
497 : /*
498 : * Do parallel index bulk-deletion with parallel workers.
499 : */
500 : void
501 14 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
502 : int num_index_scans)
503 : {
504 : Assert(!IsParallelWorker());
505 :
506 : /*
507 : * We can only provide an approximate value of num_heap_tuples, at least
508 : * for now.
509 : */
510 14 : pvs->shared->reltuples = num_table_tuples;
511 14 : pvs->shared->estimated_count = true;
512 :
513 14 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
514 14 : }
515 :
516 : /*
517 : * Do parallel index cleanup with parallel workers.
518 : */
519 : void
520 22 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
521 : int num_index_scans, bool estimated_count)
522 : {
523 : Assert(!IsParallelWorker());
524 :
525 : /*
526 : * We can provide a better estimate of total number of surviving tuples
527 : * (we assume indexes are more interested in that than in the number of
528 : * nominally live tuples).
529 : */
530 22 : pvs->shared->reltuples = num_table_tuples;
531 22 : pvs->shared->estimated_count = estimated_count;
532 :
533 22 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
534 22 : }
535 :
536 : /*
537 : * Compute the number of parallel worker processes to request. Both index
538 : * vacuum and index cleanup can be executed with parallel workers.
539 : * The index is eligible for parallel vacuum iff its size is greater than
540 : * min_parallel_index_scan_size as invoking workers for very small indexes
541 : * can hurt performance.
542 : *
543 : * nrequested is the number of parallel workers that user requested. If
544 : * nrequested is 0, we compute the parallel degree based on nindexes, that is
545 : * the number of indexes that support parallel vacuum. This function also
546 : * sets will_parallel_vacuum to remember indexes that participate in parallel
547 : * vacuum.
548 : */
549 : static int
550 7934 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
551 : bool *will_parallel_vacuum)
552 : {
553 7934 : int nindexes_parallel = 0;
554 7934 : int nindexes_parallel_bulkdel = 0;
555 7934 : int nindexes_parallel_cleanup = 0;
556 : int parallel_workers;
557 :
558 : /*
559 : * We don't allow performing parallel operation in standalone backend or
560 : * when parallelism is disabled.
561 : */
562 7934 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
563 4214 : return 0;
564 :
565 : /*
566 : * Compute the number of indexes that can participate in parallel vacuum.
567 : */
568 12160 : for (int i = 0; i < nindexes; i++)
569 : {
570 8440 : Relation indrel = indrels[i];
571 8440 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
572 :
573 : /* Skip index that is not a suitable target for parallel index vacuum */
574 8440 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
575 8440 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
576 8334 : continue;
577 :
578 106 : will_parallel_vacuum[i] = true;
579 :
580 106 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
581 94 : nindexes_parallel_bulkdel++;
582 106 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
583 82 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
584 94 : nindexes_parallel_cleanup++;
585 : }
586 :
587 3720 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
588 : nindexes_parallel_cleanup);
589 :
590 : /* The leader process takes one index */
591 3720 : nindexes_parallel--;
592 :
593 : /* No index supports parallel vacuum */
594 3720 : if (nindexes_parallel <= 0)
595 3698 : return 0;
596 :
597 : /* Compute the parallel degree */
598 22 : parallel_workers = (nrequested > 0) ?
599 22 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
600 :
601 : /* Cap by max_parallel_maintenance_workers */
602 22 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
603 :
604 22 : return parallel_workers;
605 : }
606 :
607 : /*
608 : * Perform index vacuum or index cleanup with parallel workers. This function
609 : * must be used by the parallel vacuum leader process.
610 : */
611 : static void
612 36 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
613 : bool vacuum)
614 : {
615 : int nworkers;
616 : PVIndVacStatus new_status;
617 :
618 : Assert(!IsParallelWorker());
619 :
620 36 : if (vacuum)
621 : {
622 14 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
623 :
624 : /* Determine the number of parallel workers to launch */
625 14 : nworkers = pvs->nindexes_parallel_bulkdel;
626 : }
627 : else
628 : {
629 22 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
630 :
631 : /* Determine the number of parallel workers to launch */
632 22 : nworkers = pvs->nindexes_parallel_cleanup;
633 :
634 : /* Add conditionally parallel-aware indexes if in the first time call */
635 22 : if (num_index_scans == 0)
636 8 : nworkers += pvs->nindexes_parallel_condcleanup;
637 : }
638 :
639 : /* The leader process will participate */
640 36 : nworkers--;
641 :
642 : /*
643 : * It is possible that parallel context is initialized with fewer workers
644 : * than the number of indexes that need a separate worker in the current
645 : * phase, so we need to consider it. See
646 : * parallel_vacuum_compute_workers().
647 : */
648 36 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
649 :
650 : /*
651 : * Set index vacuum status and mark whether parallel vacuum worker can
652 : * process it.
653 : */
654 184 : for (int i = 0; i < pvs->nindexes; i++)
655 : {
656 148 : PVIndStats *indstats = &(pvs->indstats[i]);
657 :
658 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
659 148 : indstats->status = new_status;
660 148 : indstats->parallel_workers_can_process =
661 284 : (pvs->will_parallel_vacuum[i] &&
662 136 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
663 : num_index_scans,
664 : vacuum));
665 : }
666 :
667 : /* Reset the parallel index processing and progress counters */
668 36 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
669 :
670 : /* Setup the shared cost-based vacuum delay and launch workers */
671 36 : if (nworkers > 0)
672 : {
673 : /* Reinitialize parallel context to relaunch parallel workers */
674 26 : if (num_index_scans > 0)
675 4 : ReinitializeParallelDSM(pvs->pcxt);
676 :
677 : /*
678 : * Set up shared cost balance and the number of active workers for
679 : * vacuum delay. We need to do this before launching workers as
680 : * otherwise, they might not see the updated values for these
681 : * parameters.
682 : */
683 26 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
684 26 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
685 :
686 : /*
687 : * The number of workers can vary between bulkdelete and cleanup
688 : * phase.
689 : */
690 26 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
691 :
692 26 : LaunchParallelWorkers(pvs->pcxt);
693 :
694 26 : if (pvs->pcxt->nworkers_launched > 0)
695 : {
696 : /*
697 : * Reset the local cost values for leader backend as we have
698 : * already accumulated the remaining balance of heap.
699 : */
700 26 : VacuumCostBalance = 0;
701 26 : VacuumCostBalanceLocal = 0;
702 :
703 : /* Enable shared cost balance for leader backend */
704 26 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
705 26 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
706 : }
707 :
708 26 : if (vacuum)
709 14 : ereport(pvs->shared->elevel,
710 : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
711 : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
712 : pvs->pcxt->nworkers_launched),
713 : pvs->pcxt->nworkers_launched, nworkers)));
714 : else
715 12 : ereport(pvs->shared->elevel,
716 : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
717 : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
718 : pvs->pcxt->nworkers_launched),
719 : pvs->pcxt->nworkers_launched, nworkers)));
720 : }
721 :
722 : /* Vacuum the indexes that can be processed by only leader process */
723 36 : parallel_vacuum_process_unsafe_indexes(pvs);
724 :
725 : /*
726 : * Join as a parallel worker. The leader vacuums alone processes all
727 : * parallel-safe indexes in the case where no workers are launched.
728 : */
729 36 : parallel_vacuum_process_safe_indexes(pvs);
730 :
731 : /*
732 : * Next, accumulate buffer and WAL usage. (This must wait for the workers
733 : * to finish, or we might get incomplete data.)
734 : */
735 36 : if (nworkers > 0)
736 : {
737 : /* Wait for all vacuum workers to finish */
738 26 : WaitForParallelWorkersToFinish(pvs->pcxt);
739 :
740 64 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
741 38 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
742 : }
743 :
744 : /*
745 : * Reset all index status back to initial (while checking that we have
746 : * vacuumed all indexes).
747 : */
748 184 : for (int i = 0; i < pvs->nindexes; i++)
749 : {
750 148 : PVIndStats *indstats = &(pvs->indstats[i]);
751 :
752 148 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
753 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
754 : RelationGetRelationName(pvs->indrels[i]));
755 :
756 148 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
757 : }
758 :
759 : /*
760 : * Carry the shared balance value to heap scan and disable shared costing
761 : */
762 36 : if (VacuumSharedCostBalance)
763 : {
764 26 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
765 26 : VacuumSharedCostBalance = NULL;
766 26 : VacuumActiveNWorkers = NULL;
767 : }
768 36 : }
769 :
770 : /*
771 : * Index vacuum/cleanup routine used by the leader process and parallel
772 : * vacuum worker processes to vacuum the indexes in parallel.
773 : */
774 : static void
775 74 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
776 : {
777 : /*
778 : * Increment the active worker count if we are able to launch any worker.
779 : */
780 74 : if (VacuumActiveNWorkers)
781 64 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
782 :
783 : /* Loop until all indexes are vacuumed */
784 : for (;;)
785 148 : {
786 : int idx;
787 : PVIndStats *indstats;
788 :
789 : /* Get an index number to process */
790 222 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
791 :
792 : /* Done for all indexes? */
793 222 : if (idx >= pvs->nindexes)
794 74 : break;
795 :
796 148 : indstats = &(pvs->indstats[idx]);
797 :
798 : /*
799 : * Skip vacuuming index that is unsafe for workers or has an
800 : * unsuitable target for parallel index vacuum (this is vacuumed in
801 : * parallel_vacuum_process_unsafe_indexes() by the leader).
802 : */
803 148 : if (!indstats->parallel_workers_can_process)
804 60 : continue;
805 :
806 : /* Do vacuum or cleanup of the index */
807 88 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
808 : }
809 :
810 : /*
811 : * We have completed the index vacuum so decrement the active worker
812 : * count.
813 : */
814 74 : if (VacuumActiveNWorkers)
815 64 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
816 74 : }
817 :
818 : /*
819 : * Perform parallel vacuuming of indexes in leader process.
820 : *
821 : * Handles index vacuuming (or index cleanup) for indexes that are not
822 : * parallel safe. It's possible that this will vary for a given index, based
823 : * on details like whether we're performing index cleanup right now.
824 : *
825 : * Also performs vacuuming of smaller indexes that fell under the size cutoff
826 : * enforced by parallel_vacuum_compute_workers().
827 : */
828 : static void
829 36 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
830 : {
831 : Assert(!IsParallelWorker());
832 :
833 : /*
834 : * Increment the active worker count if we are able to launch any worker.
835 : */
836 36 : if (VacuumActiveNWorkers)
837 26 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
838 :
839 184 : for (int i = 0; i < pvs->nindexes; i++)
840 : {
841 148 : PVIndStats *indstats = &(pvs->indstats[i]);
842 :
843 : /* Skip, indexes that are safe for workers */
844 148 : if (indstats->parallel_workers_can_process)
845 88 : continue;
846 :
847 : /* Do vacuum or cleanup of the index */
848 60 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
849 : }
850 :
851 : /*
852 : * We have completed the index vacuum so decrement the active worker
853 : * count.
854 : */
855 36 : if (VacuumActiveNWorkers)
856 26 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
857 36 : }
858 :
859 : /*
860 : * Vacuum or cleanup index either by leader process or by one of the worker
861 : * process. After vacuuming the index this function copies the index
862 : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
863 : * segment.
864 : */
865 : static void
866 148 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
867 : PVIndStats *indstats)
868 : {
869 148 : IndexBulkDeleteResult *istat = NULL;
870 : IndexBulkDeleteResult *istat_res;
871 : IndexVacuumInfo ivinfo;
872 :
873 : /*
874 : * Update the pointer to the corresponding bulk-deletion result if someone
875 : * has already updated it
876 : */
877 148 : if (indstats->istat_updated)
878 50 : istat = &(indstats->istat);
879 :
880 148 : ivinfo.index = indrel;
881 148 : ivinfo.heaprel = pvs->heaprel;
882 148 : ivinfo.analyze_only = false;
883 148 : ivinfo.report_progress = false;
884 148 : ivinfo.message_level = DEBUG2;
885 148 : ivinfo.estimated_count = pvs->shared->estimated_count;
886 148 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
887 148 : ivinfo.strategy = pvs->bstrategy;
888 :
889 : /* Update error traceback information */
890 148 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
891 148 : pvs->status = indstats->status;
892 :
893 148 : switch (indstats->status)
894 : {
895 50 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
896 50 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
897 50 : &pvs->shared->dead_items_info);
898 50 : break;
899 98 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
900 98 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
901 98 : break;
902 0 : default:
903 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
904 : indstats->status,
905 : RelationGetRelationName(indrel));
906 : }
907 :
908 : /*
909 : * Copy the index bulk-deletion result returned from ambulkdelete and
910 : * amvacuumcleanup to the DSM segment if it's the first cycle because they
911 : * allocate locally and it's possible that an index will be vacuumed by a
912 : * different vacuum process the next cycle. Copying the result normally
913 : * happens only the first time an index is vacuumed. For any additional
914 : * vacuum pass, we directly point to the result on the DSM segment and
915 : * pass it to vacuum index APIs so that workers can update it directly.
916 : *
917 : * Since all vacuum workers write the bulk-deletion result at different
918 : * slots we can write them without locking.
919 : */
920 148 : if (!indstats->istat_updated && istat_res != NULL)
921 : {
922 82 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
923 82 : indstats->istat_updated = true;
924 :
925 : /* Free the locally-allocated bulk-deletion result */
926 82 : pfree(istat_res);
927 : }
928 :
929 : /*
930 : * Update the status to completed. No need to lock here since each worker
931 : * touches different indexes.
932 : */
933 148 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
934 :
935 : /* Reset error traceback information */
936 148 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
937 148 : pfree(pvs->indname);
938 148 : pvs->indname = NULL;
939 :
940 : /*
941 : * Call the parallel variant of pgstat_progress_incr_param so workers can
942 : * report progress of index vacuum to the leader.
943 : */
944 148 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
945 148 : }
946 :
947 : /*
948 : * Returns false, if the given index can't participate in the next execution of
949 : * parallel index vacuum or parallel index cleanup.
950 : */
951 : static bool
952 136 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
953 : bool vacuum)
954 : {
955 : uint8 vacoptions;
956 :
957 136 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
958 :
959 : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
960 136 : if (vacuum)
961 44 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
962 :
963 : /* Not safe, if the index does not support parallel cleanup */
964 92 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
965 68 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
966 12 : return false;
967 :
968 : /*
969 : * Not safe, if the index supports parallel cleanup conditionally, but we
970 : * have already processed the index (for bulkdelete). We do this to avoid
971 : * the need to invoke workers when parallel index cleanup doesn't need to
972 : * scan the index. See the comments for option
973 : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
974 : * parallel cleanup conditionally.
975 : */
976 80 : if (num_index_scans > 0 &&
977 40 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
978 32 : return false;
979 :
980 48 : return true;
981 : }
982 :
983 : /*
984 : * Perform work within a launched parallel process.
985 : *
986 : * Since parallel vacuum workers perform only index vacuum or index cleanup,
987 : * we don't need to report progress information.
988 : */
989 : void
990 38 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
991 : {
992 : ParallelVacuumState pvs;
993 : Relation rel;
994 : Relation *indrels;
995 : PVIndStats *indstats;
996 : PVShared *shared;
997 : TidStore *dead_items;
998 : BufferUsage *buffer_usage;
999 : WalUsage *wal_usage;
1000 : int nindexes;
1001 : char *sharedquery;
1002 : ErrorContextCallback errcallback;
1003 :
1004 : /*
1005 : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1006 : * don't support parallel vacuum for autovacuum as of now.
1007 : */
1008 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1009 :
1010 38 : elog(DEBUG1, "starting parallel vacuum worker");
1011 :
1012 38 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1013 :
1014 : /* Set debug_query_string for individual workers */
1015 38 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1016 38 : debug_query_string = sharedquery;
1017 38 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1018 :
1019 : /* Track query ID */
1020 38 : pgstat_report_query_id(shared->queryid, false);
1021 :
1022 : /*
1023 : * Open table. The lock mode is the same as the leader process. It's
1024 : * okay because the lock mode does not conflict among the parallel
1025 : * workers.
1026 : */
1027 38 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1028 :
1029 : /*
1030 : * Open all indexes. indrels are sorted in order by OID, which should be
1031 : * matched to the leader's one.
1032 : */
1033 38 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1034 : Assert(nindexes > 0);
1035 :
1036 38 : if (shared->maintenance_work_mem_worker > 0)
1037 38 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1038 :
1039 : /* Set index statistics */
1040 38 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1041 : PARALLEL_VACUUM_KEY_INDEX_STATS,
1042 : false);
1043 :
1044 : /* Find dead_items in shared memory */
1045 38 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1046 : shared->dead_items_handle);
1047 :
1048 : /* Set cost-based vacuum delay */
1049 38 : VacuumUpdateCosts();
1050 38 : VacuumCostBalance = 0;
1051 38 : VacuumCostBalanceLocal = 0;
1052 38 : VacuumSharedCostBalance = &(shared->cost_balance);
1053 38 : VacuumActiveNWorkers = &(shared->active_nworkers);
1054 :
1055 : /* Set parallel vacuum state */
1056 38 : pvs.indrels = indrels;
1057 38 : pvs.nindexes = nindexes;
1058 38 : pvs.indstats = indstats;
1059 38 : pvs.shared = shared;
1060 38 : pvs.dead_items = dead_items;
1061 38 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1062 38 : pvs.relname = pstrdup(RelationGetRelationName(rel));
1063 38 : pvs.heaprel = rel;
1064 :
1065 : /* These fields will be filled during index vacuum or cleanup */
1066 38 : pvs.indname = NULL;
1067 38 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1068 :
1069 : /* Each parallel VACUUM worker gets its own access strategy. */
1070 76 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1071 38 : shared->ring_nbuffers * (BLCKSZ / 1024));
1072 :
1073 : /* Setup error traceback support for ereport() */
1074 38 : errcallback.callback = parallel_vacuum_error_callback;
1075 38 : errcallback.arg = &pvs;
1076 38 : errcallback.previous = error_context_stack;
1077 38 : error_context_stack = &errcallback;
1078 :
1079 : /* Prepare to track buffer usage during parallel execution */
1080 38 : InstrStartParallelQuery();
1081 :
1082 : /* Process indexes to perform vacuum/cleanup */
1083 38 : parallel_vacuum_process_safe_indexes(&pvs);
1084 :
1085 : /* Report buffer/WAL usage during parallel execution */
1086 38 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1087 38 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1088 38 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1089 38 : &wal_usage[ParallelWorkerNumber]);
1090 :
1091 38 : TidStoreDetach(dead_items);
1092 :
1093 : /* Pop the error context stack */
1094 38 : error_context_stack = errcallback.previous;
1095 :
1096 38 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1097 38 : table_close(rel, ShareUpdateExclusiveLock);
1098 38 : FreeAccessStrategy(pvs.bstrategy);
1099 38 : }
1100 :
1101 : /*
1102 : * Error context callback for errors occurring during parallel index vacuum.
1103 : * The error context messages should match the messages set in the lazy vacuum
1104 : * error context. If you change this function, change vacuum_error_callback()
1105 : * as well.
1106 : */
1107 : static void
1108 0 : parallel_vacuum_error_callback(void *arg)
1109 : {
1110 0 : ParallelVacuumState *errinfo = arg;
1111 :
1112 0 : switch (errinfo->status)
1113 : {
1114 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1115 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1116 : errinfo->indname,
1117 : errinfo->relnamespace,
1118 : errinfo->relname);
1119 0 : break;
1120 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1121 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1122 : errinfo->indname,
1123 : errinfo->relnamespace,
1124 : errinfo->relname);
1125 0 : break;
1126 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1127 : case PARALLEL_INDVAC_STATUS_COMPLETED:
1128 : default:
1129 0 : return;
1130 : }
1131 : }
|