-
In an |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
I found out that PyrateLimiter does support this use case as well. So no need for a Dagster integration. There is an option to use a Redis server for synchronization in a distributed system. There seems to be an issue in a multiple node setup, but I think the author is working on this. Hope it helps someone. |
Beta Was this translation helpful? Give feedback.
-
for those who are looking for ready-to-use solution:
#!/usr/bin/env python3
from time import sleep
from typing import Dict
import sqlite3
from contextlib import contextmanager
from dagster import AssetExecutionContext, ConfigurableResource
from pyrate_limiter import (
Duration,
Rate,
Limiter,
BucketFullException,
SQLiteBucket,
SQLiteQueries as Queries,
)
from pydantic import PrivateAttr
@contextmanager
def get_limiter(rate: Dict, table_name: str):
rate = Rate(rate.get("items"), Duration.SECOND * rate.get("seconds"))
rates = [rate]
conn = sqlite3.connect(
"/path/to/db.sqlite",
isolation_level="EXCLUSIVE",
check_same_thread=False,
)
# check if table exists
if not conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?;",
(table_name,),
).fetchone():
# create table if it doesn't exist
index_name = table_name + "__timestamp_index"
drop_table_query = Queries.DROP_TABLE.format(table=table_name)
drop_index_query = Queries.DROP_INDEX.format(index=index_name)
create_table_query = Queries.CREATE_BUCKET_TABLE.format(table=table_name)
conn.execute(drop_table_query)
conn.execute(drop_index_query)
conn.execute(create_table_query)
create_idx_query = Queries.CREATE_INDEX_ON_TIMESTAMP.format(
index_name=index_name,
table_name=table_name,
)
conn.execute(create_idx_query)
bucket = SQLiteBucket(rates, conn, table_name)
limiter = Limiter(bucket)
try:
yield limiter
finally:
conn.close()
class RateLimiterResource(ConfigurableResource):
bucket_name: str
rate: Dict[str, int]
_limiter = PrivateAttr()
@contextmanager
def yield_for_execution(self, context):
# keep connection open for the duration of the execution
# set up the connection attribute so it can be used in the execution
with get_limiter(self.rate, self.bucket_name) as conn:
self._limiter = conn
# yield, allowing execution to occur
yield self
def try_acquire(
self,
key: str,
retries_before_failing: int,
context: AssetExecutionContext,
delay: int,
) -> None:
for i in range(retries_before_failing):
try:
self._limiter.try_acquire(key)
return
except BucketFullException as err:
context.log.warning(
f"Retrying acquiring rate limiter allowance for {i}th time, due to error: {err.meta_info}"
)
if i == retries_before_failing - 1:
context.log.error(
f"Failed to acquire rate limiter allowance after {i} retries, raising error"
)
raise BucketFullException() from err
else:
sleep(delay)
continue
@asset
def my_daily_partitioned_asset(
context: AssetExecutionContext, rate_limiter: RateLimiterResource
) -> str:
rate_limiter.try_acquire(
context=context, key=context.partition_key, retries_before_failing=10, delay=30
)
defs = Definitions(
assets=all_assets,
jobs=[hackernews_job], # Addition: add the job to Definitions object (see below)
resources={
"io_manager": io_manager,
"rate_limiter": RateLimiterResource(
rate=dict(items=10, seconds=70), bucket_name="my-bucket-table"
),
},
) test it by spawning separate http server from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import PlainTextResponse
app = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.route("/")
@limiter.limit("30/minute")
async def catch_all(request: Request):
# Extracting the path and query parameters
query_params = dict(request.query_params)
path = request.path_params
return PlainTextResponse(f"Path: {path}, Query Params: {query_params}") |
Beta Was this translation helpful? Give feedback.
I found out that PyrateLimiter does support this use case as well. So no need for a Dagster integration. There is an option to use a Redis server for synchronization in a distributed system. There seems to be an issue in a multiple node setup, but I think the author is working on this. Hope it helps someone.