8#include <aws/core/Core_EXPORTS.h>
10#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
26 template<
typename CLOCK = std::chrono::high_resolution_clock,
typename DUR = std::chrono::seconds,
bool RENORMALIZE_RATE_CHANGES = true>
39 m_elapsedTimeFunction(elapsedTimeFunction),
43 m_accumulatorFraction(0),
44 m_accumulatorUpdated(),
45 m_replenishNumerator(0),
46 m_replenishDenominator(0),
51 static_assert(DUR::period::num > 0,
"Rate duration must have positive numerator");
52 static_assert(DUR::period::den > 0,
"Rate duration must have positive denominator");
53 static_assert(CLOCK::duration::period::num > 0,
"RateLimiter clock duration must have positive numerator");
54 static_assert(CLOCK::duration::period::den > 0,
"RateLimiter clock duration must have positive denominator");
66 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
68 auto now = m_elapsedTimeFunction();
69 auto elapsedTime = (now - m_accumulatorUpdated).count();
72 auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction;
73 m_accumulator += temp / m_replenishDenominator;
74 m_accumulatorFraction = temp % m_replenishDenominator;
77 m_accumulator = (std::min)(m_accumulator, m_maxRate);
78 if (m_accumulator == m_maxRate)
80 m_accumulatorFraction = 0;
85 if (m_accumulator < 0)
87 delay =
DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator);
91 m_accumulator -= cost;
92 m_accumulatorUpdated = now;
102 auto costInMilliseconds =
ApplyCost(cost);
103 if(costInMilliseconds.count() > 0)
105 std::this_thread::sleep_for(costInMilliseconds);
112 virtual void SetRate(int64_t rate,
bool resetAccumulator =
false)
override
114 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
117 rate = (std::max)(
static_cast<int64_t
>(1), rate);
119 if (resetAccumulator)
121 m_accumulator = rate;
122 m_accumulatorFraction = 0;
123 m_accumulatorUpdated = m_elapsedTimeFunction();
130 if (ShouldRenormalizeAccumulatorOnRateChange())
138 m_accumulator = m_accumulator * rate / m_maxRate;
139 m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate;
146 m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num;
147 m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den;
148 auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator);
149 m_replenishNumerator /= gcd;
150 m_replenishDenominator /= gcd;
153 m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den;
154 m_delayDenominator = DelayType::period::den * DUR::period::num;
155 gcd = ComputeGCD(m_delayNumerator, m_delayDenominator);
156 m_delayNumerator /= gcd;
157 m_delayDenominator /= gcd;
162 int64_t ComputeGCD(int64_t num1, int64_t num2)
const
167 int64_t rem = num1 % num2;
175 bool ShouldRenormalizeAccumulatorOnRateChange()
const {
return RENORMALIZE_RATE_CHANGES; }
184 std::recursive_mutex m_accumulatorLock;
188 int64_t m_accumulator;
191 int64_t m_accumulatorFraction;
197 int64_t m_replenishNumerator;
198 int64_t m_replenishDenominator;
199 int64_t m_delayNumerator;
200 int64_t m_delayDenominator;
virtual void SetRate(int64_t rate, bool resetAccumulator=false) override
virtual ~DefaultRateLimiter()=default
virtual DelayType ApplyCost(int64_t cost) override
virtual void ApplyAndPayForCost(int64_t cost) override
std::function< InternalTimePointType() > ElapsedTimeFunctionType
DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction=CLOCK::now)
std::chrono::time_point< CLOCK > InternalTimePointType
std::chrono::milliseconds DelayType