Skip to content

Commit

Permalink
fix(subscriptions): use async_lru on latest data instead of TTLCache
Browse files Browse the repository at this point in the history
  • Loading branch information
CammilleCC committed Nov 4, 2024
1 parent 418a032 commit ca7a09e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 73 deletions.
13 changes: 1 addition & 12 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pydantic-settings = "^2.2.1"
async-lru = "^2.0.4"
ldap3 = "^2.9.1"
requests = "^2.32.3"
cachetools = "^5.5.0"

[tool.poetry.group.test.dependencies]
pytest = "^7.4.2"
Expand Down
126 changes: 66 additions & 60 deletions api/src/damnit_api/graphql/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
from collections import defaultdict
from typing import AsyncGenerator

from cachetools import TTLCache
from async_lru import alru_cache
import strawberry
from strawberry.types import Info
from strawberry.scalars import JSON
Expand All @@ -15,9 +14,66 @@

POLLING_INTERVAL = 1 # seconds

LATEST_DATA: dict[int, TTLCache] = defaultdict(
lambda: TTLCache(maxsize=3, ttl=POLLING_INTERVAL)
)

@alru_cache(ttl=POLLING_INTERVAL)
async def get_latest_data(proposal, timestamp, schema):
# Get latest data
latest_data = await async_latest_rows(
proposal,
table="run_variables",
by="timestamp",
start_at=timestamp,
)
if not len(latest_data):
return

latest_data = LatestData.from_list(latest_data)

# Get latest runs
latest_runs = await async_latest_rows(
proposal,
table="run_info",
by="added_at",
start_at=timestamp,
)
latest_runs = create_map(latest_runs, key="run")

# Update model
model = get_model(proposal)
latest_variables = await async_variables(proposal)
model_changed = model.update(
latest_variables,
timestamp=latest_data.timestamp,
)
if model_changed:
# Update GraphQL schema
schema.update(model.stype)

latest_counts = await async_count(proposal, table="run_info")
model.num_rows = latest_counts

# Aggregate run values from latest data and runs
runs = {}
for run, variables in latest_data.runs.items():
run_values = {name: data.value for name, data in variables.items()}
run_values.setdefault("run", run)

if run_info := latest_runs.get(run):
run_values.update(run_info)

runs[run] = model.resolve(**run_values)

# Return the latest values if any
if len(runs):
metadata = {
"rows": model.num_rows,
"variables": model.variables,
"timestamp": model.timestamp * 1000, # deserialize to JS
}

result = {"runs": runs, "metadata": metadata}

return result


@strawberry.type
Expand All @@ -30,65 +86,15 @@ async def latest_data(
database: DatabaseInput,
timestamp: Timestamp,
) -> AsyncGenerator[JSON, None]:
proposal = database.proposal
model = get_model(proposal)

while True:
# Sleep first :)
await asyncio.sleep(POLLING_INTERVAL)

if cached := LATEST_DATA[proposal].get(timestamp):
yield cached

# Get latest data
latest_data = await async_latest_rows(
proposal,
table="run_variables",
by="timestamp",
start_at=timestamp,
)
if not len(latest_data):
continue
latest_data = LatestData.from_list(latest_data)

# Get latest runs
latest_runs = await async_latest_rows(
proposal, table="run_info", by="added_at", start_at=timestamp
result = await get_latest_data(
proposal=database.proposal,
timestamp=timestamp,
schema=info.context["schema"],
)
latest_runs = create_map(latest_runs, key="run")

# Update model
model_changed = model.update(
await async_variables(proposal),
timestamp=latest_data.timestamp,
)
if model_changed:
# Update GraphQL schema
info.context["schema"].update(model.stype)
model.num_rows = await async_count(proposal, table="run_info")

# Aggregate run values from latest data and runs
runs = {}
for run, variables in latest_data.runs.items():
run_values = {
name: data.value for name, data in variables.items()
}
run_values.setdefault("run", run)

if run_info := latest_runs.get(run):
run_values.update(run_info)

runs[run] = model.resolve(**run_values)

# Return the latest values if any
if len(runs):
metadata = {
"rows": model.num_rows,
"variables": model.variables,
"timestamp": model.timestamp * 1000, # deserialize to JS
}

result = {"runs": runs, "metadata": metadata}
LATEST_DATA[proposal][timestamp] = result

if result is not None:
yield result

0 comments on commit ca7a09e

Please sign in to comment.