diff --git a/weave_query/requirements.legacy.txt b/weave_query/requirements.legacy.txt index 1b761cee89c8..0cb9be4996aa 100644 --- a/weave_query/requirements.legacy.txt +++ b/weave_query/requirements.legacy.txt @@ -22,7 +22,7 @@ aiohttp>=3.8.3 aiofiles>=22.1.0 aioprocessing>=2.0.1 Werkzeug>=3.0.3 # CVE 2024-34069 -janus>=1.0.0 +janus==1.0.0 # 1.2.0 breaks ThreadQueue unit tests and possibly ThreadQueue # we use this for logger, could probably skip it python-json-logger>=2.0.4 diff --git a/weave_query/tests/test_async_queue.py b/weave_query/tests/test_async_queue.py index 321c6e9d1cdb..e5e8d145a389 100644 --- a/weave_query/tests/test_async_queue.py +++ b/weave_query/tests/test_async_queue.py @@ -1,6 +1,5 @@ import asyncio import threading -from typing import Optional import aioprocessing import pytest @@ -18,7 +17,7 @@ async def process_producer(queue: Queue) -> None: await asyncio.wait(tasks) -def process_consumer(queue: Queue, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: +def process_consumer(queue: Queue) -> None: async def _consume(): for _ in range(3): if isinstance(queue, ThreadQueue): @@ -28,10 +27,7 @@ async def _consume(): print(f"Consumer: {item}", flush=True) queue.task_done() - if isinstance(queue, ThreadQueue): - asyncio.run_coroutine_threadsafe(_consume(), loop).result() - else: - asyncio.run(_consume()) + asyncio.run(_consume()) @pytest.mark.timeout(10) @@ -52,10 +48,9 @@ async def test_async_process_queue_shared() -> None: @pytest.mark.asyncio async def test_async_thread_queue_shared() -> None: queue: Queue = ThreadQueue() - loop = asyncio.get_event_loop() - consumer_thread = threading.Thread(target=process_consumer, args=(queue, loop)) + consumer_thread = threading.Thread(target=process_consumer, args=(queue,)) consumer_thread.start() await process_producer(queue) queue.join() - consumer_thread.join() \ No newline at end of file + consumer_thread.join()