Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add token bucket strategy #234

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ Supported Strategies
- Allow another request that arrives at ``00:01:00``
- Reject the request that arrives at ``00:01:01``

`Token Bucket <https://limits.readthedocs.io/en/latest/strategies.html#token-bucket>`_

The token bucket strategy allows bursts of traffic up to a fixed capacity,
while refilling the bucket at a steady rate over time.

For example, with a rate limit of 10 tokens with 1 token per second refill:

- Allow 10 requests at once if the bucket is full
- Refill the bucket with 1 token every second
- If the bucket is empty, further requests will be rejected until more tokens are available

Storage backends
================

Expand Down
12 changes: 12 additions & 0 deletions doc/source/strategies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ rate limit as the window for each limit is not fixed at the start and end of eac
(i.e. N/second for a moving window means N in the last 1000 milliseconds). There is
however a higher memory cost associated with this strategy as it requires ``N`` items to
be maintained in memory per resource and rate limit.

Token Bucket
=============

The token bucket strategy allows bursts of traffic up to a fixed capacity,
while refilling the bucket at a steady rate over time.

For example, with a rate limit of 10 tokens with 1 token per second refill:

- Allow 10 requests at once if the bucket is full
- Refill the bucket with 1 token every second
- If the bucket is empty, further requests will be rejected until more tokens are available
98 changes: 98 additions & 0 deletions limits/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,113 @@ def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
<= item.amount
)

class TokenBucketRateLimiter(RateLimiter):
"""
Reference: :ref:`strategies:token bucket`
"""

def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Consume tokens from the bucket if available.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:param cost: The cost of this hit, default 1
:return: True if the request is allowed, False otherwise
"""
key = item.key_for(*identifiers)
tokens, last_refill = self._get_bucket_state(item, key)
refill_rate = item.amount / item.get_expiry() # tokens per second
current_time = self.storage.get_current_time()

# Refill tokens based on time since the last refill
time_since_last_refill = current_time - last_refill
new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate)

if new_tokens >= cost:
# Consume the tokens
new_tokens -= cost
self._set_bucket_state(item, key, new_tokens, current_time)
return True
else:
# Not enough tokens, reject the request
return False

def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Check if there are enough tokens available without consuming any.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:param cost: The expected cost to be consumed, default 1
:return: True if there are enough tokens, False otherwise
"""
key = item.key_for(*identifiers)
tokens, last_refill = self._get_bucket_state(item, key)
refill_rate = item.amount / item.get_expiry()
current_time = self.storage.get_current_time()

# Refill tokens based on time since the last refill
time_since_last_refill = current_time - last_refill
new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate)

return new_tokens >= cost

def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""
Returns the current token count and the next refill time.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:return: tuple (next refill time, tokens remaining)
"""
key = item.key_for(*identifiers)
tokens, last_refill = self._get_bucket_state(item, key)
refill_rate = item.amount / item.get_expiry()
current_time = self.storage.get_current_time()

time_since_last_refill = current_time - last_refill
new_tokens = min(item.amount, tokens + time_since_last_refill * refill_rate)
next_refill = last_refill + (1 / refill_rate if refill_rate > 0 else 0)

return WindowStats(next_refill, new_tokens)

def _get_bucket_state(self, item: RateLimitItem, key: str):
"""
Helper function to get the current state of the token bucket.

:param item: The rate limit item
:param key: The key representing the bucket
:return: A tuple of (current tokens, last refill timestamp)
"""
stored = self.storage.get(key) or (item.amount, self.storage.get_current_time())
tokens, last_refill = stored
return float(tokens), last_refill

def _set_bucket_state(self, item: RateLimitItem, key: str, tokens: float, timestamp: float):
"""
Helper function to update the token bucket state.

:param item: The rate limit item
:param key: The key representing the bucket
:param tokens: The number of tokens remaining
:param timestamp: The last refill timestamp
"""
self.storage.set(key, (tokens, timestamp), item.get_expiry())

KnownStrategy = Union[
Type[FixedWindowRateLimiter],
Type[FixedWindowElasticExpiryRateLimiter],
Type[MovingWindowRateLimiter],
Type[TokenBucketRateLimiter],
]

STRATEGIES: Dict[str, KnownStrategy] = {
"fixed-window": FixedWindowRateLimiter,
"fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter,
"moving-window": MovingWindowRateLimiter,
"token-bucket": TokenBucketRateLimiter,
}