Line data Source code
1 : /* ---------- 2 : * backend_progress.c 3 : * 4 : * Command progress reporting infrastructure. 5 : * 6 : * Copyright (c) 2001-2023, PostgreSQL Global Development Group 7 : * 8 : * src/backend/utils/activity/backend_progress.c 9 : * ---------- 10 : */ 11 : #include "postgres.h" 12 : 13 : #include "access/parallel.h" 14 : #include "libpq/pqformat.h" 15 : #include "port/atomics.h" /* for memory barriers */ 16 : #include "utils/backend_progress.h" 17 : #include "utils/backend_status.h" 18 : 19 : 20 : /*----------- 21 : * pgstat_progress_start_command() - 22 : * 23 : * Set st_progress_command (and st_progress_command_target) in own backend 24 : * entry. Also, zero-initialize st_progress_param array. 25 : *----------- 26 : */ 27 : void 28 60784 : pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid) 29 : { 30 60784 : volatile PgBackendStatus *beentry = MyBEEntry; 31 : 32 60784 : if (!beentry || !pgstat_track_activities) 33 0 : return; 34 : 35 60784 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 36 60784 : beentry->st_progress_command = cmdtype; 37 60784 : beentry->st_progress_command_target = relid; 38 1276464 : MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param)); 39 60784 : PGSTAT_END_WRITE_ACTIVITY(beentry); 40 : } 41 : 42 : /*----------- 43 : * pgstat_progress_update_param() - 44 : * 45 : * Update index'th member in st_progress_param[] of own backend entry. 46 : *----------- 47 : */ 48 : void 49 20627450 : pgstat_progress_update_param(int index, int64 val) 50 : { 51 20627450 : volatile PgBackendStatus *beentry = MyBEEntry; 52 : 53 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM); 54 : 55 20627450 : if (!beentry || !pgstat_track_activities) 56 0 : return; 57 : 58 20627450 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 59 20627450 : beentry->st_progress_param[index] = val; 60 20627450 : PGSTAT_END_WRITE_ACTIVITY(beentry); 61 : } 62 : 63 : /*----------- 64 : * pgstat_progress_incr_param() - 65 : * 66 : * Increment index'th member in st_progress_param[] of own backend entry. 67 : *----------- 68 : */ 69 : void 70 2326 : pgstat_progress_incr_param(int index, int64 incr) 71 : { 72 2326 : volatile PgBackendStatus *beentry = MyBEEntry; 73 : 74 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM); 75 : 76 2326 : if (!beentry || !pgstat_track_activities) 77 0 : return; 78 : 79 2326 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 80 2326 : beentry->st_progress_param[index] += incr; 81 2326 : PGSTAT_END_WRITE_ACTIVITY(beentry); 82 : } 83 : 84 : /*----------- 85 : * pgstat_progress_parallel_incr_param() - 86 : * 87 : * A variant of pgstat_progress_incr_param to allow a worker to poke at 88 : * a leader to do an incremental progress update. 89 : *----------- 90 : */ 91 : void 92 120 : pgstat_progress_parallel_incr_param(int index, int64 incr) 93 : { 94 : /* 95 : * Parallel workers notify a leader through a 'P' protocol message to 96 : * update progress, passing the progress index and incremented value. 97 : * Leaders can just call pgstat_progress_incr_param directly. 98 : */ 99 120 : if (IsParallelWorker()) 100 : { 101 : static StringInfoData progress_message; 102 : 103 0 : initStringInfo(&progress_message); 104 : 105 0 : pq_beginmessage(&progress_message, 'P'); 106 0 : pq_sendint32(&progress_message, index); 107 0 : pq_sendint64(&progress_message, incr); 108 0 : pq_endmessage(&progress_message); 109 : } 110 : else 111 120 : pgstat_progress_incr_param(index, incr); 112 120 : } 113 : 114 : /*----------- 115 : * pgstat_progress_update_multi_param() - 116 : * 117 : * Update multiple members in st_progress_param[] of own backend entry. 118 : * This is atomic; readers won't see intermediate states. 119 : *----------- 120 : */ 121 : void 122 841230 : pgstat_progress_update_multi_param(int nparam, const int *index, 123 : const int64 *val) 124 : { 125 841230 : volatile PgBackendStatus *beentry = MyBEEntry; 126 : int i; 127 : 128 841230 : if (!beentry || !pgstat_track_activities || nparam == 0) 129 0 : return; 130 : 131 841230 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 132 : 133 2250276 : for (i = 0; i < nparam; ++i) 134 : { 135 : Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM); 136 : 137 1409046 : beentry->st_progress_param[index[i]] = val[i]; 138 : } 139 : 140 841230 : PGSTAT_END_WRITE_ACTIVITY(beentry); 141 : } 142 : 143 : /*----------- 144 : * pgstat_progress_end_command() - 145 : * 146 : * Reset st_progress_command (and st_progress_command_target) in own backend 147 : * entry. This signals the end of the command. 148 : *----------- 149 : */ 150 : void 151 109110 : pgstat_progress_end_command(void) 152 : { 153 109110 : volatile PgBackendStatus *beentry = MyBEEntry; 154 : 155 109110 : if (!beentry || !pgstat_track_activities) 156 0 : return; 157 : 158 109110 : if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID) 159 49650 : return; 160 : 161 59460 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); 162 59460 : beentry->st_progress_command = PROGRESS_COMMAND_INVALID; 163 59460 : beentry->st_progress_command_target = InvalidOid; 164 59460 : PGSTAT_END_WRITE_ACTIVITY(beentry); 165 : }