Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * basebackup_throttle.c 4 : * Basebackup sink implementing throttling. Data is forwarded to the 5 : * next base backup sink in the chain at a rate no greater than the 6 : * configured maximum. 7 : * 8 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group 9 : * 10 : * IDENTIFICATION 11 : * src/backend/backup/basebackup_throttle.c 12 : * 13 : *------------------------------------------------------------------------- 14 : */ 15 : #include "postgres.h" 16 : 17 : #include "backup/basebackup_sink.h" 18 : #include "miscadmin.h" 19 : #include "pgstat.h" 20 : #include "storage/latch.h" 21 : #include "utils/timestamp.h" 22 : 23 : typedef struct bbsink_throttle 24 : { 25 : /* Common information for all types of sink. */ 26 : bbsink base; 27 : 28 : /* The actual number of bytes, transfer of which may cause sleep. */ 29 : uint64 throttling_sample; 30 : 31 : /* Amount of data already transferred but not yet throttled. */ 32 : int64 throttling_counter; 33 : 34 : /* The minimum time required to transfer throttling_sample bytes. */ 35 : TimeOffset elapsed_min_unit; 36 : 37 : /* The last check of the transfer rate. */ 38 : TimestampTz throttled_last; 39 : } bbsink_throttle; 40 : 41 : static void bbsink_throttle_begin_backup(bbsink *sink); 42 : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len); 43 : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len); 44 : static void throttle(bbsink_throttle *sink, size_t increment); 45 : 46 : static const bbsink_ops bbsink_throttle_ops = { 47 : .begin_backup = bbsink_throttle_begin_backup, 48 : .begin_archive = bbsink_forward_begin_archive, 49 : .archive_contents = bbsink_throttle_archive_contents, 50 : .end_archive = bbsink_forward_end_archive, 51 : .begin_manifest = bbsink_forward_begin_manifest, 52 : .manifest_contents = bbsink_throttle_manifest_contents, 53 : .end_manifest = bbsink_forward_end_manifest, 54 : .end_backup = bbsink_forward_end_backup, 55 : .cleanup = bbsink_forward_cleanup 56 : }; 57 : 58 : /* 59 : * How frequently to throttle, as a fraction of the specified rate-second. 60 : */ 61 : #define THROTTLING_FREQUENCY 8 62 : 63 : /* 64 : * Create a new basebackup sink that performs throttling and forwards data 65 : * to a successor sink. 66 : */ 67 : bbsink * 68 4 : bbsink_throttle_new(bbsink *next, uint32 maxrate) 69 : { 70 : bbsink_throttle *sink; 71 : 72 : Assert(next != NULL); 73 : Assert(maxrate > 0); 74 : 75 4 : sink = palloc0(sizeof(bbsink_throttle)); 76 4 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; 77 4 : sink->base.bbs_next = next; 78 : 79 4 : sink->throttling_sample = 80 4 : (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; 81 : 82 : /* 83 : * The minimum amount of time for throttling_sample bytes to be 84 : * transferred. 85 : */ 86 4 : sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; 87 : 88 4 : return &sink->base; 89 : } 90 : 91 : /* 92 : * There's no real work to do here, but we need to record the current time so 93 : * that it can be used for future calculations. 94 : */ 95 : static void 96 4 : bbsink_throttle_begin_backup(bbsink *sink) 97 : { 98 4 : bbsink_throttle *mysink = (bbsink_throttle *) sink; 99 : 100 4 : bbsink_forward_begin_backup(sink); 101 : 102 : /* The 'real data' starts now (header was ignored). */ 103 4 : mysink->throttled_last = GetCurrentTimestamp(); 104 4 : } 105 : 106 : /* 107 : * First throttle, and then pass archive contents to next sink. 108 : */ 109 : static void 110 62 : bbsink_throttle_archive_contents(bbsink *sink, size_t len) 111 : { 112 62 : throttle((bbsink_throttle *) sink, len); 113 : 114 60 : bbsink_forward_archive_contents(sink, len); 115 60 : } 116 : 117 : /* 118 : * First throttle, and then pass manifest contents to next sink. 119 : */ 120 : static void 121 0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len) 122 : { 123 0 : throttle((bbsink_throttle *) sink, len); 124 : 125 0 : bbsink_forward_manifest_contents(sink, len); 126 0 : } 127 : 128 : /* 129 : * Increment the network transfer counter by the given number of bytes, 130 : * and sleep if necessary to comply with the requested network transfer 131 : * rate. 132 : */ 133 : static void 134 62 : throttle(bbsink_throttle *sink, size_t increment) 135 : { 136 : TimeOffset elapsed_min; 137 : 138 : Assert(sink->throttling_counter >= 0); 139 : 140 62 : sink->throttling_counter += increment; 141 62 : if (sink->throttling_counter < sink->throttling_sample) 142 52 : return; 143 : 144 : /* How much time should have elapsed at minimum? */ 145 10 : elapsed_min = sink->elapsed_min_unit * 146 10 : (sink->throttling_counter / sink->throttling_sample); 147 : 148 : /* 149 : * Since the latch could be set repeatedly because of concurrently WAL 150 : * activity, sleep in a loop to ensure enough time has passed. 151 : */ 152 : for (;;) 153 0 : { 154 : TimeOffset elapsed, 155 : sleep; 156 : int wait_result; 157 : 158 : /* Time elapsed since the last measurement (and possible wake up). */ 159 10 : elapsed = GetCurrentTimestamp() - sink->throttled_last; 160 : 161 : /* sleep if the transfer is faster than it should be */ 162 10 : sleep = elapsed_min - elapsed; 163 10 : if (sleep <= 0) 164 0 : break; 165 : 166 10 : ResetLatch(MyLatch); 167 : 168 : /* We're eating a potentially set latch, so check for interrupts */ 169 10 : CHECK_FOR_INTERRUPTS(); 170 : 171 : /* 172 : * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be 173 : * the maximum time to sleep. Thus the cast to long is safe. 174 : */ 175 10 : wait_result = WaitLatch(MyLatch, 176 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 177 10 : (long) (sleep / 1000), 178 : WAIT_EVENT_BASE_BACKUP_THROTTLE); 179 : 180 10 : if (wait_result & WL_LATCH_SET) 181 2 : CHECK_FOR_INTERRUPTS(); 182 : 183 : /* Done waiting? */ 184 8 : if (wait_result & WL_TIMEOUT) 185 8 : break; 186 : } 187 : 188 : /* 189 : * As we work with integers, only whole multiple of throttling_sample was 190 : * processed. The rest will be done during the next call of this function. 191 : */ 192 8 : sink->throttling_counter %= sink->throttling_sample; 193 : 194 : /* 195 : * Time interval for the remaining amount and possible next increments 196 : * starts now. 197 : */ 198 8 : sink->throttled_last = GetCurrentTimestamp(); 199 : }