Skip to content

Commit

Permalink
Pytest Tweaks, Part III - Test Isolation (#277)
Browse files Browse the repository at this point in the history
* feat: isolate tests better

* fix: lint

* fix: unwind fixture, seemed to not work in CI for some reason

* Revert "fix: unwind fixture, seemed to not work in CI for some reason"

This reverts commit 0c694e9.

* fix: re-add manual wait
  • Loading branch information
hatchet-temporary authored Nov 30, 2024
1 parent 70b1c88 commit 26764a3
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 115 deletions.
2 changes: 1 addition & 1 deletion compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ volumes:
hatchet_rabbitmq_data:
hatchet_rabbitmq.conf:
hatchet_config:
hatchet_certs:
hatchet_certs:
45 changes: 44 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import AsyncGenerator
import logging
import subprocess
import time
from io import BytesIO
from threading import Thread
from typing import AsyncGenerator, Callable, cast

import psutil
import pytest
import pytest_asyncio

Expand All @@ -14,3 +20,40 @@ async def aiohatchet() -> AsyncGenerator[Hatchet, None]:
@pytest.fixture(scope="session")
def hatchet() -> Hatchet:
return Hatchet(debug=True)


@pytest.fixture()
def worker(request: pytest.FixtureRequest):
example = cast(str, request.param)

command = ["poetry", "run", example]

logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")

time.sleep(5)

def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
log_func(line.decode().strip())

Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()

yield proc

logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()

_, alive = psutil.wait_procs([parent] + children, timeout=3)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(aiohatchet: Hatchet):
async def test_run(aiohatchet: Hatchet, worker):
num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 3 additions & 4 deletions examples/api/test_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_list_workflows(hatchet: Hatchet, worker):
workflows = hatchet.rest.workflow_list()
Expand All @@ -16,6 +14,7 @@ async def test_list_workflows(hatchet: Hatchet, worker):


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_async_list_workflows(aiohatchet: Hatchet, worker):
workflows = await aiohatchet.rest.aio.workflow_list()
Expand Down
9 changes: 4 additions & 5 deletions examples/async/test_async.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "async"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["async"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"


@pytest.mark.parametrize("worker", ["async"], indirect=True)
@pytest.mark.skip(reason="Skipping this test until we can dedicate more time to debug")
@pytest.mark.asyncio(scope="session")
async def test_run_async(aiohatchet: Hatchet):
async def test_run_async(aiohatchet: Hatchet, worker):
run = await aiohatchet.admin.aio.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"
9 changes: 4 additions & 5 deletions examples/bulk_fanout/test_bulk_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "bulk_fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 12})
result = await run.result()
assert len(result["spawn"]["results"]) == 12


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 10})
result = await run.result()
assert len(result["spawn"]["results"]) == 10
6 changes: 2 additions & 4 deletions examples/cancellation/test_cancellation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "cancellation"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["cancellation"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("CancelWorkflow", {})
result = await run.result()
# TODO is this the expected result for a timed out run...
Expand Down
10 changes: 3 additions & 7 deletions examples/concurrency_limit/test_concurrency_limit.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import asyncio

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.parametrize("worker", ["concurrency_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_runs = 6
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 2 additions & 5 deletions examples/concurrency_limit_rr/test_concurrency_limit_rr.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import asyncio
import time

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
6 changes: 2 additions & 4 deletions examples/dag/test_dag.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "dag"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["dag"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("DagWorkflow", {})
result = await run.result()

Expand Down
9 changes: 4 additions & 5 deletions examples/fanout/test_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2
6 changes: 2 additions & 4 deletions examples/logger/test_logger.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "logger"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["logger"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("LoggingWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
6 changes: 2 additions & 4 deletions examples/on_failure/test_on_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.job_run_status import JobRunStatus
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "on_failure"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["on_failure"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("OnFailureWorkflow", {})
try:
await run.result()
Expand Down
6 changes: 2 additions & 4 deletions examples/rate_limit/test_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "rate_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["rate_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):

run1 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
run2 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
Expand Down
9 changes: 4 additions & 5 deletions examples/timeout/test_timeout.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "timeout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("TimeoutWorkflow", {})
try:
await run.result()
Expand All @@ -18,7 +16,8 @@ async def test_run_timeout(hatchet: Hatchet):


@pytest.mark.asyncio(scope="session")
async def test_run_refresh_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_refresh_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("RefreshTimeoutWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
Empty file removed tests/__init__.py
Empty file.
1 change: 0 additions & 1 deletion tests/utils/__init__.py

This file was deleted.

52 changes: 0 additions & 52 deletions tests/utils/bg_worker.py

This file was deleted.

0 comments on commit 26764a3

Please sign in to comment.