From 26764a3a5183f92d55880cd034fc655d659af43d Mon Sep 17 00:00:00 2001 From: hatchet-temporary Date: Sat, 30 Nov 2024 15:57:16 -0500 Subject: [PATCH] Pytest Tweaks, Part III - Test Isolation (#277) * feat: isolate tests better * fix: lint * fix: unwind fixture, seemed to not work in CI for some reason * Revert "fix: unwind fixture, seemed to not work in CI for some reason" This reverts commit 0c694e9d83f2fe1e80a9c99d0f5b62cf90fe17e6. * fix: re-add manual wait --- compose.yml | 2 +- conftest.py | 45 +++++++++++++++- .../test_dep_concurrency_limit_rr.py | 6 +-- examples/api/test_api.py | 7 ++- examples/async/test_async.py | 9 ++-- examples/bulk_fanout/test_bulk_fanout.py | 9 ++-- examples/cancellation/test_cancellation.py | 6 +-- .../test_concurrency_limit.py | 10 ++-- .../test_concurrency_limit_rr.py | 7 +-- examples/dag/test_dag.py | 6 +-- examples/fanout/test_fanout.py | 9 ++-- examples/logger/test_logger.py | 6 +-- examples/on_failure/test_on_failure.py | 6 +-- examples/rate_limit/test_rate_limit.py | 6 +-- examples/timeout/test_timeout.py | 9 ++-- tests/__init__.py | 0 tests/utils/__init__.py | 1 - tests/utils/bg_worker.py | 52 ------------------- 18 files changed, 81 insertions(+), 115 deletions(-) delete mode 100644 tests/__init__.py delete mode 100644 tests/utils/__init__.py delete mode 100644 tests/utils/bg_worker.py diff --git a/compose.yml b/compose.yml index 7fbd8306..e96cbc52 100644 --- a/compose.yml +++ b/compose.yml @@ -112,4 +112,4 @@ volumes: hatchet_rabbitmq_data: hatchet_rabbitmq.conf: hatchet_config: - hatchet_certs: \ No newline at end of file + hatchet_certs: diff --git a/conftest.py b/conftest.py index 581c1da2..2aff5cd3 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,11 @@ -from typing import AsyncGenerator +import logging +import subprocess +import time +from io import BytesIO +from threading import Thread +from typing import AsyncGenerator, Callable, cast +import psutil import pytest import pytest_asyncio @@ -14,3 +20,40 @@ async def aiohatchet() -> AsyncGenerator[Hatchet, None]: @pytest.fixture(scope="session") def hatchet() -> Hatchet: return Hatchet(debug=True) + + +@pytest.fixture() +def worker(request: pytest.FixtureRequest): + example = cast(str, request.param) + + command = ["poetry", "run", example] + + logging.info(f"Starting background worker: {' '.join(command)}") + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # Check if the process is still running + if proc.poll() is not None: + raise Exception(f"Worker failed to start with return code {proc.returncode}") + + time.sleep(5) + + def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None: + for line in iter(pipe.readline, b""): + log_func(line.decode().strip()) + + Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start() + Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start() + + yield proc + + logging.info("Cleaning up background worker") + parent = psutil.Process(proc.pid) + children = parent.children(recursive=True) + for child in children: + child.terminate() + parent.terminate() + + _, alive = psutil.wait_procs([parent] + children, timeout=3) + for p in alive: + logging.warning(f"Force killing process {p.pid}") + p.kill() diff --git a/examples/_deprecated/concurrency_limit_rr/test_dep_concurrency_limit_rr.py b/examples/_deprecated/concurrency_limit_rr/test_dep_concurrency_limit_rr.py index ed6718c4..fa5176e8 100644 --- a/examples/_deprecated/concurrency_limit_rr/test_dep_concurrency_limit_rr.py +++ b/examples/_deprecated/concurrency_limit_rr/test_dep_concurrency_limit_rr.py @@ -5,15 +5,13 @@ from hatchet_sdk import Hatchet from hatchet_sdk.workflow_run import WorkflowRunRef -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"]) # requires scope module or higher for shared event loop +@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True) @pytest.mark.skip(reason="The timing for this test is not reliable") @pytest.mark.asyncio(scope="session") -async def test_run(aiohatchet: Hatchet): +async def test_run(aiohatchet: Hatchet, worker): num_groups = 2 runs: list[WorkflowRunRef] = [] diff --git a/examples/api/test_api.py b/examples/api/test_api.py index 1634948d..fa1a6f8c 100644 --- a/examples/api/test_api.py +++ b/examples/api/test_api.py @@ -1,13 +1,11 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists -worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"]) # requires scope module or higher for shared event loop +## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists +@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True) @pytest.mark.asyncio(scope="session") async def test_list_workflows(hatchet: Hatchet, worker): workflows = hatchet.rest.workflow_list() @@ -16,6 +14,7 @@ async def test_list_workflows(hatchet: Hatchet, worker): # requires scope module or higher for shared event loop +@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True) @pytest.mark.asyncio(scope="session") async def test_async_list_workflows(aiohatchet: Hatchet, worker): workflows = await aiohatchet.rest.aio.workflow_list() diff --git a/examples/async/test_async.py b/examples/async/test_async.py index 26aab346..419a875f 100644 --- a/examples/async/test_async.py +++ b/examples/async/test_async.py @@ -1,22 +1,21 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "async"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["async"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("AsyncWorkflow", {}) result = await run.result() assert result["step1"]["test"] == "test" +@pytest.mark.parametrize("worker", ["async"], indirect=True) @pytest.mark.skip(reason="Skipping this test until we can dedicate more time to debug") @pytest.mark.asyncio(scope="session") -async def test_run_async(aiohatchet: Hatchet): +async def test_run_async(aiohatchet: Hatchet, worker): run = await aiohatchet.admin.aio.run_workflow("AsyncWorkflow", {}) result = await run.result() assert result["step1"]["test"] == "test" diff --git a/examples/bulk_fanout/test_bulk_fanout.py b/examples/bulk_fanout/test_bulk_fanout.py index e0cd5fa8..153e717c 100644 --- a/examples/bulk_fanout/test_bulk_fanout.py +++ b/examples/bulk_fanout/test_bulk_fanout.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "bulk_fanout"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("BulkParent", {"n": 12}) result = await run.result() assert len(result["spawn"]["results"]) == 12 @@ -16,7 +14,8 @@ async def test_run(hatchet: Hatchet): # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run2(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True) +async def test_run2(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("BulkParent", {"n": 10}) result = await run.result() assert len(result["spawn"]["results"]) == 10 diff --git a/examples/cancellation/test_cancellation.py b/examples/cancellation/test_cancellation.py index 3f5bf210..1c282a43 100644 --- a/examples/cancellation/test_cancellation.py +++ b/examples/cancellation/test_cancellation.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "cancellation"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["cancellation"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("CancelWorkflow", {}) result = await run.result() # TODO is this the expected result for a timed out run... diff --git a/examples/concurrency_limit/test_concurrency_limit.py b/examples/concurrency_limit/test_concurrency_limit.py index d28d7fe9..5eaff381 100644 --- a/examples/concurrency_limit/test_concurrency_limit.py +++ b/examples/concurrency_limit/test_concurrency_limit.py @@ -1,18 +1,14 @@ -import asyncio - import pytest from hatchet_sdk import Hatchet from hatchet_sdk.workflow_run import WorkflowRunRef -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "concurrency_limit"]) # requires scope module or higher for shared event loop -@pytest.mark.skip(reason="The timing for this test is not reliable") @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.skip(reason="The timing for this test is not reliable") +@pytest.mark.parametrize("worker", ["concurrency_limit"], indirect=True) +async def test_run(hatchet: Hatchet, worker): num_runs = 6 runs: list[WorkflowRunRef] = [] diff --git a/examples/concurrency_limit_rr/test_concurrency_limit_rr.py b/examples/concurrency_limit_rr/test_concurrency_limit_rr.py index 7565722d..2f9969a5 100644 --- a/examples/concurrency_limit_rr/test_concurrency_limit_rr.py +++ b/examples/concurrency_limit_rr/test_concurrency_limit_rr.py @@ -1,19 +1,16 @@ -import asyncio import time import pytest from hatchet_sdk import Hatchet from hatchet_sdk.workflow_run import WorkflowRunRef -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"]) # requires scope module or higher for shared event loop @pytest.mark.skip(reason="The timing for this test is not reliable") @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True) +async def test_run(hatchet: Hatchet, worker): num_groups = 2 runs: list[WorkflowRunRef] = [] diff --git a/examples/dag/test_dag.py b/examples/dag/test_dag.py index 81b42932..e0620837 100644 --- a/examples/dag/test_dag.py +++ b/examples/dag/test_dag.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "dag"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["dag"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("DagWorkflow", {}) result = await run.result() diff --git a/examples/fanout/test_fanout.py b/examples/fanout/test_fanout.py index f4d2735d..1243c130 100644 --- a/examples/fanout/test_fanout.py +++ b/examples/fanout/test_fanout.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "fanout"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["fanout"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("Parent", {"n": 2}) result = await run.result() assert len(result["spawn"]["results"]) == 2 @@ -16,7 +14,8 @@ async def test_run(hatchet: Hatchet): # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run2(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["fanout"], indirect=True) +async def test_run2(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("Parent", {"n": 2}) result = await run.result() assert len(result["spawn"]["results"]) == 2 diff --git a/examples/logger/test_logger.py b/examples/logger/test_logger.py index 59a4be95..b89cde10 100644 --- a/examples/logger/test_logger.py +++ b/examples/logger/test_logger.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "logger"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["logger"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("LoggingWorkflow", {}) result = await run.result() assert result["step1"]["status"] == "success" diff --git a/examples/on_failure/test_on_failure.py b/examples/on_failure/test_on_failure.py index e9947954..3020d792 100644 --- a/examples/on_failure/test_on_failure.py +++ b/examples/on_failure/test_on_failure.py @@ -4,14 +4,12 @@ from hatchet_sdk import Hatchet from hatchet_sdk.clients.rest.models.job_run_status import JobRunStatus -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "on_failure"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run_timeout(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["on_failure"], indirect=True) +async def test_run_timeout(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("OnFailureWorkflow", {}) try: await run.result() diff --git a/examples/rate_limit/test_rate_limit.py b/examples/rate_limit/test_rate_limit.py index af11c012..a7e9aabe 100644 --- a/examples/rate_limit/test_rate_limit.py +++ b/examples/rate_limit/test_rate_limit.py @@ -4,15 +4,13 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "rate_limit"]) # requires scope module or higher for shared event loop @pytest.mark.skip(reason="The timing for this test is not reliable") @pytest.mark.asyncio(scope="session") -async def test_run(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["rate_limit"], indirect=True) +async def test_run(hatchet: Hatchet, worker): run1 = hatchet.admin.run_workflow("RateLimitWorkflow", {}) run2 = hatchet.admin.run_workflow("RateLimitWorkflow", {}) diff --git a/examples/timeout/test_timeout.py b/examples/timeout/test_timeout.py index e97a1af7..d18881cb 100644 --- a/examples/timeout/test_timeout.py +++ b/examples/timeout/test_timeout.py @@ -1,14 +1,12 @@ import pytest from hatchet_sdk import Hatchet -from tests.utils import fixture_bg_worker - -worker = fixture_bg_worker(["poetry", "run", "timeout"]) # requires scope module or higher for shared event loop @pytest.mark.asyncio(scope="session") -async def test_run_timeout(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["timeout"], indirect=True) +async def test_run_timeout(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("TimeoutWorkflow", {}) try: await run.result() @@ -18,7 +16,8 @@ async def test_run_timeout(hatchet: Hatchet): @pytest.mark.asyncio(scope="session") -async def test_run_refresh_timeout(hatchet: Hatchet): +@pytest.mark.parametrize("worker", ["timeout"], indirect=True) +async def test_run_refresh_timeout(hatchet: Hatchet, worker): run = hatchet.admin.run_workflow("RefreshTimeoutWorkflow", {}) result = await run.result() assert result["step1"]["status"] == "success" diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py deleted file mode 100644 index 32ce81e2..00000000 --- a/tests/utils/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .bg_worker import fixture_bg_worker diff --git a/tests/utils/bg_worker.py b/tests/utils/bg_worker.py deleted file mode 100644 index 97619db2..00000000 --- a/tests/utils/bg_worker.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging -import subprocess -import time - -import psutil -import pytest - - -def fixture_bg_worker(command, startup_time=5): - @pytest.fixture(scope="session", autouse=True) - def fixture_background_hatchet_worker(request): - logging.info(f"Starting background worker: {' '.join(command)}") - proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Check if the process is still running - if proc.poll() is not None: - raise Exception( - f"Worker failed to start with return code {proc.returncode}" - ) - - # Wait for startup - time.sleep(startup_time) - - # Log stdout and stderr - def log_output(pipe, log_func): - for line in iter(pipe.readline, b""): - log_func(line.decode().strip()) - - import threading - - threading.Thread( - target=log_output, args=(proc.stdout, logging.info), daemon=True - ).start() - threading.Thread( - target=log_output, args=(proc.stderr, logging.error), daemon=True - ).start() - - yield proc - - logging.info("Cleaning up background worker") - parent = psutil.Process(proc.pid) - children = parent.children(recursive=True) - for child in children: - child.terminate() - parent.terminate() - - _, alive = psutil.wait_procs([parent] + children, timeout=3) - for p in alive: - logging.warning(f"Force killing process {p.pid}") - p.kill() - - return fixture_background_hatchet_worker