AWS SDK for C++

AWS SDK for C++ Version 1.11.440

Loading...
Searching...
No Matches
DefaultRateLimiter.h
1
6#pragma once
7
8#include <aws/core/Core_EXPORTS.h>
9
10#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
11
12#include <algorithm>
13#include <mutex>
14#include <thread>
15#include <functional>
16
17namespace Aws
18{
19 namespace Utils
20 {
21 namespace RateLimits
22 {
26 template<typename CLOCK = std::chrono::high_resolution_clock, typename DUR = std::chrono::seconds, bool RENORMALIZE_RATE_CHANGES = true>
28 {
29 public:
31
32 using InternalTimePointType = std::chrono::time_point<CLOCK>;
34
38 DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction = CLOCK::now) :
39 m_elapsedTimeFunction(elapsedTimeFunction),
40 m_maxRate(0),
41 m_accumulatorLock(),
42 m_accumulator(0),
43 m_accumulatorFraction(0),
44 m_accumulatorUpdated(),
45 m_replenishNumerator(0),
46 m_replenishDenominator(0),
47 m_delayNumerator(0),
48 m_delayDenominator(0)
49 {
50 // verify we're not going to divide by zero due to goofy type parameterization
51 static_assert(DUR::period::num > 0, "Rate duration must have positive numerator");
52 static_assert(DUR::period::den > 0, "Rate duration must have positive denominator");
53 static_assert(CLOCK::duration::period::num > 0, "RateLimiter clock duration must have positive numerator");
54 static_assert(CLOCK::duration::period::den > 0, "RateLimiter clock duration must have positive denominator");
55
56 DefaultRateLimiter::SetRate(maxRate, true);
57 }
58
59 virtual ~DefaultRateLimiter() = default;
60
64 virtual DelayType ApplyCost(int64_t cost) override
65 {
66 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
67
68 auto now = m_elapsedTimeFunction();
69 auto elapsedTime = (now - m_accumulatorUpdated).count();
70
71 // replenish the accumulator based on how much time has passed
72 auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction;
73 m_accumulator += temp / m_replenishDenominator;
74 m_accumulatorFraction = temp % m_replenishDenominator;
75
76 // the accumulator is capped based on the maximum rate
77 m_accumulator = (std::min)(m_accumulator, m_maxRate);
78 if (m_accumulator == m_maxRate)
79 {
80 m_accumulatorFraction = 0;
81 }
82
83 // if the accumulator is still negative, then we'll have to wait
84 DelayType delay(0);
85 if (m_accumulator < 0)
86 {
87 delay = DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator);
88 }
89
90 // apply the cost to the accumulator after the delay has been calculated; the next call will end up paying for our cost
91 m_accumulator -= cost;
92 m_accumulatorUpdated = now;
93
94 return delay;
95 }
96
100 virtual void ApplyAndPayForCost(int64_t cost) override
101 {
102 auto costInMilliseconds = ApplyCost(cost);
103 if(costInMilliseconds.count() > 0)
104 {
105 std::this_thread::sleep_for(costInMilliseconds);
106 }
107 }
108
112 virtual void SetRate(int64_t rate, bool resetAccumulator = false) override
113 {
114 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
115
116 // rate must always be positive
117 rate = (std::max)(static_cast<int64_t>(1), rate);
118
119 if (resetAccumulator)
120 {
121 m_accumulator = rate;
122 m_accumulatorFraction = 0;
123 m_accumulatorUpdated = m_elapsedTimeFunction();
124 }
125 else
126 {
127 // sync the accumulator to current time
128 ApplyCost(0); // this call is why we need a recursive mutex
129
130 if (ShouldRenormalizeAccumulatorOnRateChange())
131 {
132 // now renormalize the accumulator and its fractional part against the new rate
133 // the idea here is we want to preserve the desired wait based on the previous rate
134 //
135 // As an example:
136 // Say we had a rate of 100/s and our accumulator was -500 (ie the next ApplyCost would incur a 5 second delay)
137 // If we change the rate to 1000/s and want to preserve that delay, we need to scale the accumulator to -5000
138 m_accumulator = m_accumulator * rate / m_maxRate;
139 m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate;
140 }
141 }
142
143 m_maxRate = rate;
144
145 // Helper constants that represent the amount replenished per CLOCK time period; use the gcd to reduce them in order to try and minimize the chance of integer overflow
146 m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num;
147 m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den;
148 auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator);
149 m_replenishNumerator /= gcd;
150 m_replenishDenominator /= gcd;
151
152 // Helper constants that represent the delay per unit of costAccumulator; use the gcd to reduce them in order to try and minimize the chance of integer overflow
153 m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den;
154 m_delayDenominator = DelayType::period::den * DUR::period::num;
155 gcd = ComputeGCD(m_delayNumerator, m_delayDenominator);
156 m_delayNumerator /= gcd;
157 m_delayDenominator /= gcd;
158 }
159
160 private:
161
162 int64_t ComputeGCD(int64_t num1, int64_t num2) const
163 {
164 // Euclid's
165 while (num2 != 0)
166 {
167 int64_t rem = num1 % num2;
168 num1 = num2;
169 num2 = rem;
170 }
171
172 return num1;
173 }
174
175 bool ShouldRenormalizeAccumulatorOnRateChange() const { return RENORMALIZE_RATE_CHANGES; }
176
178 ElapsedTimeFunctionType m_elapsedTimeFunction;
179
181 int64_t m_maxRate;
182
184 std::recursive_mutex m_accumulatorLock;
185
188 int64_t m_accumulator;
189
191 int64_t m_accumulatorFraction;
192
194 InternalTimePointType m_accumulatorUpdated;
195
197 int64_t m_replenishNumerator;
198 int64_t m_replenishDenominator;
199 int64_t m_delayNumerator;
200 int64_t m_delayDenominator;
201 };
202
203 } // namespace RateLimits
204 } // namespace Utils
205} // namespace Aws
virtual void SetRate(int64_t rate, bool resetAccumulator=false) override
virtual DelayType ApplyCost(int64_t cost) override
virtual void ApplyAndPayForCost(int64_t cost) override
std::function< InternalTimePointType() > ElapsedTimeFunctionType
DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction=CLOCK::now)
std::chrono::time_point< CLOCK > InternalTimePointType