diff --git a/README.rst b/README.rst index 920bdc8..6c3b14d 100644 --- a/README.rst +++ b/README.rst @@ -44,6 +44,17 @@ Supported Strategies - Allow another request that arrives at ``00:01:00`` - Reject the request that arrives at ``00:01:01`` +`Token Bucket `_ + + The token bucket strategy allows bursts of traffic up to a fixed capacity, + while refilling the bucket at a steady rate over time. + + For example, with a rate limit of 10 tokens with 1 token per second refill: + + - Allow 10 requests at once if the bucket is full + - Refill the bucket with 1 token every second + - If the bucket is empty, further requests will be rejected until more tokens are available + Storage backends ================ diff --git a/doc/source/strategies.rst b/doc/source/strategies.rst index 35b86a2..34c7bd6 100644 --- a/doc/source/strategies.rst +++ b/doc/source/strategies.rst @@ -42,3 +42,15 @@ rate limit as the window for each limit is not fixed at the start and end of eac (i.e. N/second for a moving window means N in the last 1000 milliseconds). There is however a higher memory cost associated with this strategy as it requires ``N`` items to be maintained in memory per resource and rate limit. + +Token Bucket +============= + +The token bucket strategy allows bursts of traffic up to a fixed capacity, +while refilling the bucket at a steady rate over time. + +For example, with a rate limit of 10 tokens with 1 token per second refill: + +- Allow 10 requests at once if the bucket is full +- Refill the bucket with 1 token every second +- If the bucket is empty, further requests will be rejected until more tokens are available \ No newline at end of file diff --git a/limits/strategies.py b/limits/strategies.py index d5072f7..f7fcc05 100644 --- a/limits/strategies.py +++ b/limits/strategies.py @@ -198,15 +198,113 @@ def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool: <= item.amount ) +class TokenBucketRateLimiter(RateLimiter): + """ + Reference: :ref:`strategies:token bucket` + """ + + def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool: + """ + Consume tokens from the bucket if available. + + :param item: The rate limit item + :param identifiers: variable list of strings to uniquely identify this + instance of the limit + :param cost: The cost of this hit, default 1 + :return: True if the request is allowed, False otherwise + """ + key = item.key_for(*identifiers) + tokens, last_refill = self._get_bucket_state(item, key) + refill_rate = item.amount / item.get_expiry() # tokens per second + current_time = self.storage.get_current_time() + + # Refill tokens based on time since the last refill + time_since_last_refill = current_time - last_refill + new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate) + + if new_tokens >= cost: + # Consume the tokens + new_tokens -= cost + self._set_bucket_state(item, key, new_tokens, current_time) + return True + else: + # Not enough tokens, reject the request + return False + + def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool: + """ + Check if there are enough tokens available without consuming any. + + :param item: The rate limit item + :param identifiers: variable list of strings to uniquely identify this + instance of the limit + :param cost: The expected cost to be consumed, default 1 + :return: True if there are enough tokens, False otherwise + """ + key = item.key_for(*identifiers) + tokens, last_refill = self._get_bucket_state(item, key) + refill_rate = item.amount / item.get_expiry() + current_time = self.storage.get_current_time() + + # Refill tokens based on time since the last refill + time_since_last_refill = current_time - last_refill + new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate) + + return new_tokens >= cost + + def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats: + """ + Returns the current token count and the next refill time. + + :param item: The rate limit item + :param identifiers: variable list of strings to uniquely identify this + instance of the limit + :return: tuple (next refill time, tokens remaining) + """ + key = item.key_for(*identifiers) + tokens, last_refill = self._get_bucket_state(item, key) + refill_rate = item.amount / item.get_expiry() + current_time = self.storage.get_current_time() + + time_since_last_refill = current_time - last_refill + new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate) + next_refill = last_refill + (1 / refill_rate if refill_rate > 0 else 0) + + return WindowStats(next_refill, new_tokens) + + def _get_bucket_state(self, item: RateLimitItem, key: str): + """ + Helper function to get the current state of the token bucket. + + :param item: The rate limit item + :param key: The key representing the bucket + :return: A tuple of (current tokens, last refill timestamp) + """ + stored = self.storage.get(key) or (item.amount, self.storage.get_current_time()) + tokens, last_refill = stored + return float(tokens), last_refill + + def _set_bucket_state(self, item: RateLimitItem, key: str, tokens: float, timestamp: float): + """ + Helper function to update the token bucket state. + + :param item: The rate limit item + :param key: The key representing the bucket + :param tokens: The number of tokens remaining + :param timestamp: The last refill timestamp + """ + self.storage.set(key, (tokens, timestamp), item.get_expiry()) KnownStrategy = Union[ Type[FixedWindowRateLimiter], Type[FixedWindowElasticExpiryRateLimiter], Type[MovingWindowRateLimiter], + Type[TokenBucketRateLimiter], ] STRATEGIES: Dict[str, KnownStrategy] = { "fixed-window": FixedWindowRateLimiter, "fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter, "moving-window": MovingWindowRateLimiter, + "token-bucket": TokenBucketRateLimiter, }