Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_throttle.c
4 : * Basebackup sink implementing throttling. Data is forwarded to the
5 : * next base backup sink in the chain at a rate no greater than the
6 : * configured maximum.
7 : *
8 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/backend/backup/basebackup_throttle.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "backup/basebackup_sink.h"
18 : #include "miscadmin.h"
19 : #include "pgstat.h"
20 : #include "storage/latch.h"
21 : #include "utils/timestamp.h"
22 : #include "utils/wait_event.h"
23 :
24 : typedef struct bbsink_throttle
25 : {
26 : /* Common information for all types of sink. */
27 : bbsink base;
28 :
29 : /* The actual number of bytes, transfer of which may cause sleep. */
30 : uint64 throttling_sample;
31 :
32 : /* Amount of data already transferred but not yet throttled. */
33 : int64 throttling_counter;
34 :
35 : /* The minimum time required to transfer throttling_sample bytes. */
36 : TimeOffset elapsed_min_unit;
37 :
38 : /* The last check of the transfer rate. */
39 : TimestampTz throttled_last;
40 : } bbsink_throttle;
41 :
42 : static void bbsink_throttle_begin_backup(bbsink *sink);
43 : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
44 : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
45 : static void throttle(bbsink_throttle *sink, size_t increment);
46 :
47 : static const bbsink_ops bbsink_throttle_ops = {
48 : .begin_backup = bbsink_throttle_begin_backup,
49 : .begin_archive = bbsink_forward_begin_archive,
50 : .archive_contents = bbsink_throttle_archive_contents,
51 : .end_archive = bbsink_forward_end_archive,
52 : .begin_manifest = bbsink_forward_begin_manifest,
53 : .manifest_contents = bbsink_throttle_manifest_contents,
54 : .end_manifest = bbsink_forward_end_manifest,
55 : .end_backup = bbsink_forward_end_backup,
56 : .cleanup = bbsink_forward_cleanup
57 : };
58 :
59 : /*
60 : * How frequently to throttle, as a fraction of the specified rate-second.
61 : */
62 : #define THROTTLING_FREQUENCY 8
63 :
64 : /*
65 : * Create a new basebackup sink that performs throttling and forwards data
66 : * to a successor sink.
67 : */
68 : bbsink *
69 1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
70 : {
71 : bbsink_throttle *sink;
72 :
73 : Assert(next != NULL);
74 : Assert(maxrate > 0);
75 :
76 1 : sink = palloc0_object(bbsink_throttle);
77 1 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
78 1 : sink->base.bbs_next = next;
79 :
80 1 : sink->throttling_sample =
81 1 : (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
82 :
83 : /*
84 : * The minimum amount of time for throttling_sample bytes to be
85 : * transferred.
86 : */
87 1 : sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
88 :
89 1 : return &sink->base;
90 : }
91 :
92 : /*
93 : * There's no real work to do here, but we need to record the current time so
94 : * that it can be used for future calculations.
95 : */
96 : static void
97 1 : bbsink_throttle_begin_backup(bbsink *sink)
98 : {
99 1 : bbsink_throttle *mysink = (bbsink_throttle *) sink;
100 :
101 1 : bbsink_forward_begin_backup(sink);
102 :
103 : /* The 'real data' starts now (header was ignored). */
104 1 : mysink->throttled_last = GetCurrentTimestamp();
105 1 : }
106 :
107 : /*
108 : * First throttle, and then pass archive contents to next sink.
109 : */
110 : static void
111 21 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
112 : {
113 21 : throttle((bbsink_throttle *) sink, len);
114 :
115 21 : bbsink_forward_archive_contents(sink, len);
116 21 : }
117 :
118 : /*
119 : * First throttle, and then pass manifest contents to next sink.
120 : */
121 : static void
122 0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
123 : {
124 0 : throttle((bbsink_throttle *) sink, len);
125 :
126 0 : bbsink_forward_manifest_contents(sink, len);
127 0 : }
128 :
129 : /*
130 : * Increment the network transfer counter by the given number of bytes,
131 : * and sleep if necessary to comply with the requested network transfer
132 : * rate.
133 : */
134 : static void
135 21 : throttle(bbsink_throttle *sink, size_t increment)
136 : {
137 : TimeOffset elapsed_min;
138 :
139 : Assert(sink->throttling_counter >= 0);
140 :
141 21 : sink->throttling_counter += increment;
142 21 : if (sink->throttling_counter < sink->throttling_sample)
143 17 : return;
144 :
145 : /* How much time should have elapsed at minimum? */
146 4 : elapsed_min = sink->elapsed_min_unit *
147 4 : (sink->throttling_counter / sink->throttling_sample);
148 :
149 : /*
150 : * Since the latch could be set repeatedly because of concurrently WAL
151 : * activity, sleep in a loop to ensure enough time has passed.
152 : */
153 : for (;;)
154 0 : {
155 : TimeOffset elapsed,
156 : sleep;
157 : int wait_result;
158 :
159 : /* Time elapsed since the last measurement (and possible wake up). */
160 4 : elapsed = GetCurrentTimestamp() - sink->throttled_last;
161 :
162 : /* sleep if the transfer is faster than it should be */
163 4 : sleep = elapsed_min - elapsed;
164 4 : if (sleep <= 0)
165 0 : break;
166 :
167 4 : ResetLatch(MyLatch);
168 :
169 : /* We're eating a potentially set latch, so check for interrupts */
170 4 : CHECK_FOR_INTERRUPTS();
171 :
172 : /*
173 : * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
174 : * the maximum time to sleep. Thus the cast to long is safe.
175 : */
176 4 : wait_result = WaitLatch(MyLatch,
177 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
178 4 : (long) (sleep / 1000),
179 : WAIT_EVENT_BASE_BACKUP_THROTTLE);
180 :
181 4 : if (wait_result & WL_LATCH_SET)
182 0 : CHECK_FOR_INTERRUPTS();
183 :
184 : /* Done waiting? */
185 4 : if (wait_result & WL_TIMEOUT)
186 4 : break;
187 : }
188 :
189 : /*
190 : * As we work with integers, only whole multiple of throttling_sample was
191 : * processed. The rest will be done during the next call of this function.
192 : */
193 4 : sink->throttling_counter %= sink->throttling_sample;
194 :
195 : /*
196 : * Time interval for the remaining amount and possible next increments
197 : * starts now.
198 : */
199 4 : sink->throttled_last = GetCurrentTimestamp();
200 : }
|