Line data Source code
1 : /* ---------- 2 : * backend_progress.c 3 : * 4 : * Command progress reporting infrastructure. 5 : * 6 : * Copyright (c) 2001-2024, PostgreSQL Global Development Group 7 : * 8 : * src/backend/utils/activity/backend_progress.c 9 : * ---------- 10 : */ 11 : #include "postgres.h" 12 : 13 : #include "access/parallel.h" 14 : #include "libpq/pqformat.h" 15 : #include "port/atomics.h" /* for memory barriers */ 16 : #include "utils/backend_progress.h" 17 : #include "utils/backend_status.h" 18 : 19 : 20 : /*----------- 21 : * pgstat_progress_start_command() - 22 : * 23 : * Set st_progress_command (and st_progress_command_target) in own backend 24 : * entry. Also, zero-initialize st_progress_param array. 25 : *----------- 26 : */ 27 : void 28 70700 : pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid) 29 : { 30 70700 : volatile PgBackendStatus *beentry = MyBEEntry; 31 : 32 70700 : if (!beentry || !pgstat_track_activities) 33 0 : return; 34 : 35 70700 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 36 70700 : beentry->st_progress_command = cmdtype; 37 70700 : beentry->st_progress_command_target = relid; 38 1484700 : MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param)); 39 70700 : PGSTAT_END_WRITE_ACTIVITY(beentry); 40 : } 41 : 42 : /*----------- 43 : * pgstat_progress_update_param() - 44 : * 45 : * Update index'th member in st_progress_param[] of own backend entry. 46 : *----------- 47 : */ 48 : void 49 21471568 : pgstat_progress_update_param(int index, int64 val) 50 : { 51 21471568 : volatile PgBackendStatus *beentry = MyBEEntry; 52 : 53 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM); 54 : 55 21471568 : if (!beentry || !pgstat_track_activities) 56 0 : return; 57 : 58 21471568 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 59 21471568 : beentry->st_progress_param[index] = val; 60 21471568 : PGSTAT_END_WRITE_ACTIVITY(beentry); 61 : } 62 : 63 : /*----------- 64 : * pgstat_progress_incr_param() - 65 : * 66 : * Increment index'th member in st_progress_param[] of own backend entry. 67 : *----------- 68 : */ 69 : void 70 2922 : pgstat_progress_incr_param(int index, int64 incr) 71 : { 72 2922 : volatile PgBackendStatus *beentry = MyBEEntry; 73 : 74 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM); 75 : 76 2922 : if (!beentry || !pgstat_track_activities) 77 0 : return; 78 : 79 2922 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 80 2922 : beentry->st_progress_param[index] += incr; 81 2922 : PGSTAT_END_WRITE_ACTIVITY(beentry); 82 : } 83 : 84 : /*----------- 85 : * pgstat_progress_parallel_incr_param() - 86 : * 87 : * A variant of pgstat_progress_incr_param to allow a worker to poke at 88 : * a leader to do an incremental progress update. 89 : *----------- 90 : */ 91 : void 92 108 : pgstat_progress_parallel_incr_param(int index, int64 incr) 93 : { 94 : /* 95 : * Parallel workers notify a leader through a 'P' protocol message to 96 : * update progress, passing the progress index and incremented value. 97 : * Leaders can just call pgstat_progress_incr_param directly. 98 : */ 99 108 : if (IsParallelWorker()) 100 : { 101 : static StringInfoData progress_message; 102 : 103 0 : initStringInfo(&progress_message); 104 : 105 0 : pq_beginmessage(&progress_message, 'P'); 106 0 : pq_sendint32(&progress_message, index); 107 0 : pq_sendint64(&progress_message, incr); 108 0 : pq_endmessage(&progress_message); 109 : } 110 : else 111 108 : pgstat_progress_incr_param(index, incr); 112 108 : } 113 : 114 : /*----------- 115 : * pgstat_progress_update_multi_param() - 116 : * 117 : * Update multiple members in st_progress_param[] of own backend entry. 118 : * This is atomic; readers won't see intermediate states. 119 : *----------- 120 : */ 121 : void 122 947124 : pgstat_progress_update_multi_param(int nparam, const int *index, 123 : const int64 *val) 124 : { 125 947124 : volatile PgBackendStatus *beentry = MyBEEntry; 126 : int i; 127 : 128 947124 : if (!beentry || !pgstat_track_activities || nparam == 0) 129 0 : return; 130 : 131 947124 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 132 : 133 2501686 : for (i = 0; i < nparam; ++i) 134 : { 135 : Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM); 136 : 137 1554562 : beentry->st_progress_param[index[i]] = val[i]; 138 : } 139 : 140 947124 : PGSTAT_END_WRITE_ACTIVITY(beentry); 141 : } 142 : 143 : /*----------- 144 : * pgstat_progress_end_command() - 145 : * 146 : * Reset st_progress_command (and st_progress_command_target) in own backend 147 : * entry. This signals the end of the command. 148 : *----------- 149 : */ 150 : void 151 122576 : pgstat_progress_end_command(void) 152 : { 153 122576 : volatile PgBackendStatus *beentry = MyBEEntry; 154 : 155 122576 : if (!beentry || !pgstat_track_activities) 156 0 : return; 157 : 158 122576 : if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID) 159 53268 : return; 160 : 161 69308 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 162 69308 : beentry->st_progress_command = PROGRESS_COMMAND_INVALID; 163 69308 : beentry->st_progress_command_target = InvalidOid; 164 69308 : PGSTAT_END_WRITE_ACTIVITY(beentry); 165 : }