Skip to content

Commit

Permalink
feat(graphql): use TTLCache on the latest data subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
CammilleCC committed Oct 26, 2024
1 parent 652b899 commit 418a032
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 10 deletions.
15 changes: 13 additions & 2 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pydantic-settings = "^2.2.1"
async-lru = "^2.0.4"
ldap3 = "^2.9.1"
requests = "^2.32.3"
cachetools = "^5.5.0"

[tool.poetry.group.test.dependencies]
pytest = "^7.4.2"
Expand Down
19 changes: 17 additions & 2 deletions api/src/damnit_api/graphql/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from collections import defaultdict
from typing import AsyncGenerator

from cachetools import TTLCache
import strawberry
from strawberry.types import Info
from strawberry.scalars import JSON
Expand All @@ -11,6 +13,13 @@
from ..utils import create_map


POLLING_INTERVAL = 1 # seconds

LATEST_DATA: dict[int, TTLCache] = defaultdict(
lambda: TTLCache(maxsize=3, ttl=POLLING_INTERVAL)
)


@strawberry.type
class Subscription:

Expand All @@ -26,7 +35,10 @@ async def latest_data(

while True:
# Sleep first :)
await asyncio.sleep(1)
await asyncio.sleep(POLLING_INTERVAL)

if cached := LATEST_DATA[proposal].get(timestamp):
yield cached

# Get latest data
latest_data = await async_latest_rows(
Expand Down Expand Up @@ -76,4 +88,7 @@ async def latest_data(
"timestamp": model.timestamp * 1000, # deserialize to JS
}

yield {"runs": runs, "metadata": metadata}
result = {"runs": runs, "metadata": metadata}
LATEST_DATA[proposal][timestamp] = result

yield result
130 changes: 124 additions & 6 deletions api/tests/graphql/test_subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
from datetime import datetime
from unittest.mock import patch

import pytest
import pytest_asyncio

from damnit_api.graphql.models import DamnitRun, serialize
from damnit_api.graphql.subscriptions import LATEST_DATA, POLLING_INTERVAL

from .const import (
EXAMPLE_VARIABLES,
Expand All @@ -14,12 +17,14 @@
NUM_ROWS,
)
from .utils import (
assert_model,
create_run_info,
create_run_variables,
)


patched_sleep = patch.object(asyncio, "sleep", return_value=asyncio.sleep(0))


@pytest.fixture(scope="module")
def current_timestamp():
return datetime.now().timestamp()
Expand All @@ -38,28 +43,34 @@ def mocked_returns(*args, table, **kwargs):
elif table == "run_info":
return [create_run_info(**KNOWN_VALUES)]

mocker.patch(
return mocker.patch(
"damnit_api.graphql.subscriptions.async_latest_rows",
side_effect=mocked_returns,
)


@pytest.fixture
def mocked_variables(mocker):
mocker.patch(
return mocker.patch(
"damnit_api.graphql.subscriptions.async_variables",
return_value=EXAMPLE_VARIABLES,
)


@pytest.fixture
def mocked_new_count(mocker):
mocker.patch(
return mocker.patch(
"damnit_api.graphql.subscriptions.async_count",
return_value=NUM_ROWS + 1,
)


@pytest.fixture(autouse=True)
def clear_latest_data():
"""Automatically clears the latest data cache before every test."""
LATEST_DATA.clear()


@pytest.mark.asyncio
async def test_latest_data(
graphql_schema,
Expand All @@ -70,7 +81,7 @@ async def test_latest_data(
):
subscription = await graphql_schema.subscribe(
"""
subscription LatestDataSubcription(
subscription LatestDataSubscription(
$proposal: String,
$timestamp: Timestamp!) {
latest_data(database: { proposal: $proposal }, timestamp: $timestamp)
Expand All @@ -80,7 +91,9 @@ async def test_latest_data(
"proposal": "1234",
"timestamp": current_timestamp * 1000, # some arbitrary timestamp
},
context_value={"schema": graphql_schema},
context_value={
"schema": graphql_schema,
},
)

# Only test the first update
Expand Down Expand Up @@ -113,3 +126,108 @@ async def test_latest_data(

# Don't forget to break!
break


@pytest.mark.asyncio
async def test_latest_data_with_concurrent_subscriptions(
graphql_schema,
current_timestamp,
mocked_latest_rows,
mocked_new_count,
mocked_variables,
):
query = """
subscription LatestDataSubscription(
$proposal: String,
$timestamp: Timestamp!) {
latest_data(database: { proposal: $proposal }, timestamp: $timestamp)
}
"""
variables = {
"proposal": "1234",
"timestamp": current_timestamp * 1000, # some arbitrary timestamp
}
context = {
"schema": graphql_schema,
}

first_sub = await graphql_schema.subscribe(
query,
variable_values=variables,
context_value=context,
)
second_sub = await graphql_schema.subscribe(
query,
variable_values=variables,
context_value=context,
)

with patched_sleep:
async for result in first_sub:
# Only test the first update
assert not result.errors
mocked_latest_rows.assert_called()
break

mocked_latest_rows.reset_mock()

with patched_sleep:
async for result in second_sub:
# Only test the first update
assert not result.errors
mocked_latest_rows.assert_not_called()
break


@pytest.mark.asyncio
async def test_latest_data_with_nonconcurrent_subscriptions(
graphql_schema,
current_timestamp,
mocked_latest_rows,
mocked_new_count,
mocked_variables,
):
query = """
subscription LatestDataSubscription(
$proposal: String,
$timestamp: Timestamp!) {
latest_data(database: { proposal: $proposal }, timestamp: $timestamp)
}
"""
variables = {
"proposal": "1234",
"timestamp": current_timestamp * 1000, # some arbitrary timestamp
}
context = {
"schema": graphql_schema,
}

first_sub = await graphql_schema.subscribe(
query,
variable_values=variables,
context_value=context,
)
second_sub = await graphql_schema.subscribe(
query,
variable_values=variables,
context_value=context,
)

with patched_sleep:
async for result in first_sub:
# Only test the first update
assert not result.errors
mocked_latest_rows.assert_called()
break

await asyncio.sleep(
POLLING_INTERVAL * 3
) # give enough time to clear the cache
mocked_latest_rows.reset_mock()

with patched_sleep:
async for result in second_sub:
# Only test the first update
assert not result.errors
mocked_latest_rows.assert_called()
break

0 comments on commit 418a032

Please sign in to comment.