Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ConcurrencyLimitRateLimiter strategy #233

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ Supported Strategies
- Allow another request that arrives at ``00:01:00``
- Reject the request that arrives at ``00:01:01``

`Concurrency limit`

Limits the number of simultaneous or concurrent executions (or tokens in use)
without focusing on the rate of requests over time. It enforces a maximum
limit of tasks allowed to run in parallel.

For example, with a concurrency limit of 5:

- Allow up to 5 tasks or requests to be processed concurrently
- Reject any further requests until one of the 5 tasks completes and releases a slot
- Once a task finishes, the limit is reduced, allowing new tasks to be executed

Storage backends
================

Expand Down
12 changes: 12 additions & 0 deletions doc/source/strategies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ rate limit as the window for each limit is not fixed at the start and end of eac
(i.e. N/second for a moving window means N in the last 1000 milliseconds). There is
however a higher memory cost associated with this strategy as it requires ``N`` items to
be maintained in memory per resource and rate limit.

`Concurrency limit`

Limits the number of simultaneous or concurrent executions (or tokens in use)
without focusing on the rate of requests over time. It enforces a maximum
limit of tasks allowed to run in parallel.

For example, with a concurrency limit of 5:

- Allow up to 5 tasks or requests to be processed concurrently
- Reject any further requests until one of the 5 tasks completes and releases a slot
- Once a task finishes, the limit is reduced, allowing new tasks to be executed
13 changes: 2 additions & 11 deletions limits/storage/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,8 @@

from limits.errors import ConfigurationError
from limits.storage.base import Storage
from limits.typing import (
Callable,
List,
MemcachedClientP,
Optional,
P,
R,
Tuple,
Type,
Union,
)
from limits.typing import (Callable, List, MemcachedClientP, Optional, P, R,
Tuple, Type, Union)
from limits.util import get_dependency


Expand Down
85 changes: 85 additions & 0 deletions limits/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,100 @@ def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
<= item.amount
)

class ConcurrencyLimitRateLimiter(RateLimiter):
"""
Reference: :ref:`strategies:concurrency limit`
"""

def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Attempt to consume tokens if the concurrency limit has not been reached.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:param cost: The cost of this hit, default 1
:return: True if the request is allowed (tokens consumed), False otherwise
"""
key = item.key_for(*identifiers)
current_concurrency = self._get_concurrency(key)

# If current concurrency is less than the allowed limit, proceed
if current_concurrency + cost <= item.amount:
# Increment the concurrency count
self.storage.incr(key, item.get_expiry(), amount=cost)
return True
else:
# Limit reached, reject the request
return False

def release(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> None:
"""
Release tokens (i.e., reduce concurrency) when a task/request is finished.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:param cost: The cost to release, default 1
"""
key = item.key_for(*identifiers)
current_concurrency = self._get_concurrency(key)

# Decrement the concurrency count
new_concurrency = max(0, current_concurrency - cost)
self.storage.set(key, new_concurrency, item.get_expiry())

def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Check if there is room for more concurrent tasks without consuming tokens.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:param cost: The expected cost to be consumed, default 1
:return: True if there is room for more concurrent tasks, False otherwise
"""
key = item.key_for(*identifiers)
current_concurrency = self._get_concurrency(key)

return current_concurrency + cost <= item.amount

def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""
Returns the current concurrency level and remaining capacity.

:param item: The rate limit item
:param identifiers: variable list of strings to uniquely identify this
instance of the limit
:return: tuple (reset time, remaining capacity)
"""
key = item.key_for(*identifiers)
current_concurrency = self._get_concurrency(key)

remaining_capacity = max(0, item.amount - current_concurrency)
reset_time = self.storage.get_expiry(key)

return WindowStats(reset_time, remaining_capacity)

def _get_concurrency(self, key: str) -> int:
"""
Helper function to retrieve the current number of concurrent tasks.

:param key: The key representing the concurrency state
:return: The current concurrency level
"""
return self.storage.get(key) or 0

KnownStrategy = Union[
Type[FixedWindowRateLimiter],
Type[FixedWindowElasticExpiryRateLimiter],
Type[MovingWindowRateLimiter],
Type[ConcurrencyLimitRateLimiter],
]

STRATEGIES: Dict[str, KnownStrategy] = {
"fixed-window": FixedWindowRateLimiter,
"fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter,
"moving-window": MovingWindowRateLimiter,
"concurrency-limit": ConcurrencyLimitRateLimiter,
}