diff --git a/api/poetry.lock b/api/poetry.lock index 7279d73..ff5be6e 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -75,17 +75,6 @@ files = [ [package.dependencies] cryptography = "*" -[[package]] -name = "cachetools" -version = "5.5.0" -description = "Extensible memoizing collections and decorators" -optional = false -python-versions = ">=3.7" -files = [ - {file = "cachetools-5.5.0-py3-none-any.whl", hash = "sha256:02134e8439cdc2ffb62023ce1debca2944c3f289d66bb17ead3ab3dede74b292"}, - {file = "cachetools-5.5.0.tar.gz", hash = "sha256:2cc24fb4cbe39633fb7badd9db9ca6295d766d9c2995f245725a46715d050f2a"}, -] - [[package]] name = "certifi" version = "2024.8.30" @@ -2422,4 +2411,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "c13f45ff1f43e9bf46699241c97ceae0efac4cfc6792829443269299b5d1e5c4" +content-hash = "542c2a8b2edf1f8f53296762cb37db3b9d1ffcebecc57e9c23bf9dcfd5a211ec" diff --git a/api/pyproject.toml b/api/pyproject.toml index 9cd0c96..c0d2f33 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -31,7 +31,6 @@ pydantic-settings = "^2.2.1" async-lru = "^2.0.4" ldap3 = "^2.9.1" requests = "^2.32.3" -cachetools = "^5.5.0" [tool.poetry.group.test.dependencies] pytest = "^7.4.2" diff --git a/api/src/damnit_api/graphql/subscriptions.py b/api/src/damnit_api/graphql/subscriptions.py index 1fcca4b..ba9e366 100644 --- a/api/src/damnit_api/graphql/subscriptions.py +++ b/api/src/damnit_api/graphql/subscriptions.py @@ -1,8 +1,7 @@ import asyncio -from collections import defaultdict from typing import AsyncGenerator -from cachetools import TTLCache +from async_lru import alru_cache import strawberry from strawberry.types import Info from strawberry.scalars import JSON @@ -15,9 +14,66 @@ POLLING_INTERVAL = 1 # seconds -LATEST_DATA: dict[int, TTLCache] = defaultdict( - lambda: TTLCache(maxsize=3, ttl=POLLING_INTERVAL) -) + +@alru_cache(ttl=POLLING_INTERVAL) +async def get_latest_data(proposal, timestamp, schema): + # Get latest data + latest_data = await async_latest_rows( + proposal, + table="run_variables", + by="timestamp", + start_at=timestamp, + ) + if not len(latest_data): + return + + latest_data = LatestData.from_list(latest_data) + + # Get latest runs + latest_runs = await async_latest_rows( + proposal, + table="run_info", + by="added_at", + start_at=timestamp, + ) + latest_runs = create_map(latest_runs, key="run") + + # Update model + model = get_model(proposal) + latest_variables = await async_variables(proposal) + model_changed = model.update( + latest_variables, + timestamp=latest_data.timestamp, + ) + if model_changed: + # Update GraphQL schema + schema.update(model.stype) + + latest_counts = await async_count(proposal, table="run_info") + model.num_rows = latest_counts + + # Aggregate run values from latest data and runs + runs = {} + for run, variables in latest_data.runs.items(): + run_values = {name: data.value for name, data in variables.items()} + run_values.setdefault("run", run) + + if run_info := latest_runs.get(run): + run_values.update(run_info) + + runs[run] = model.resolve(**run_values) + + # Return the latest values if any + if len(runs): + metadata = { + "rows": model.num_rows, + "variables": model.variables, + "timestamp": model.timestamp * 1000, # deserialize to JS + } + + result = {"runs": runs, "metadata": metadata} + + return result @strawberry.type @@ -30,65 +86,15 @@ async def latest_data( database: DatabaseInput, timestamp: Timestamp, ) -> AsyncGenerator[JSON, None]: - proposal = database.proposal - model = get_model(proposal) while True: # Sleep first :) await asyncio.sleep(POLLING_INTERVAL) - if cached := LATEST_DATA[proposal].get(timestamp): - yield cached - - # Get latest data - latest_data = await async_latest_rows( - proposal, - table="run_variables", - by="timestamp", - start_at=timestamp, - ) - if not len(latest_data): - continue - latest_data = LatestData.from_list(latest_data) - - # Get latest runs - latest_runs = await async_latest_rows( - proposal, table="run_info", by="added_at", start_at=timestamp + result = await get_latest_data( + proposal=database.proposal, + timestamp=timestamp, + schema=info.context["schema"], ) - latest_runs = create_map(latest_runs, key="run") - - # Update model - model_changed = model.update( - await async_variables(proposal), - timestamp=latest_data.timestamp, - ) - if model_changed: - # Update GraphQL schema - info.context["schema"].update(model.stype) - model.num_rows = await async_count(proposal, table="run_info") - - # Aggregate run values from latest data and runs - runs = {} - for run, variables in latest_data.runs.items(): - run_values = { - name: data.value for name, data in variables.items() - } - run_values.setdefault("run", run) - - if run_info := latest_runs.get(run): - run_values.update(run_info) - - runs[run] = model.resolve(**run_values) - - # Return the latest values if any - if len(runs): - metadata = { - "rows": model.num_rows, - "variables": model.variables, - "timestamp": model.timestamp * 1000, # deserialize to JS - } - - result = {"runs": runs, "metadata": metadata} - LATEST_DATA[proposal][timestamp] = result - + if result is not None: yield result