Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytest Tweaks, Part III - Test Isolation #277

Merged
merged 5 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ volumes:
hatchet_rabbitmq_data:
hatchet_rabbitmq.conf:
hatchet_config:
hatchet_certs:
hatchet_certs:
45 changes: 44 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import AsyncGenerator
import logging
import subprocess
import time
from io import BytesIO
from threading import Thread
from typing import AsyncGenerator, Callable, cast

import psutil
import pytest
import pytest_asyncio

Expand All @@ -14,3 +20,40 @@ async def aiohatchet() -> AsyncGenerator[Hatchet, None]:
@pytest.fixture(scope="session")
def hatchet() -> Hatchet:
return Hatchet(debug=True)


@pytest.fixture()
def worker(request: pytest.FixtureRequest):
example = cast(str, request.param)

command = ["poetry", "run", example]

logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")

time.sleep(5)

def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
log_func(line.decode().strip())

Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()

yield proc

logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()

_, alive = psutil.wait_procs([parent] + children, timeout=3)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parametrizing the fixture with the example to run

@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(aiohatchet: Hatchet):
async def test_run(aiohatchet: Hatchet, worker):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to explicitly request the fixture now

num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 3 additions & 4 deletions examples/api/test_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_list_workflows(hatchet: Hatchet, worker):
workflows = hatchet.rest.workflow_list()
Expand All @@ -16,6 +14,7 @@ async def test_list_workflows(hatchet: Hatchet, worker):


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_async_list_workflows(aiohatchet: Hatchet, worker):
workflows = await aiohatchet.rest.aio.workflow_list()
Expand Down
9 changes: 4 additions & 5 deletions examples/async/test_async.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "async"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["async"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"


@pytest.mark.parametrize("worker", ["async"], indirect=True)
@pytest.mark.skip(reason="Skipping this test until we can dedicate more time to debug")
@pytest.mark.asyncio(scope="session")
async def test_run_async(aiohatchet: Hatchet):
async def test_run_async(aiohatchet: Hatchet, worker):
run = await aiohatchet.admin.aio.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"
9 changes: 4 additions & 5 deletions examples/bulk_fanout/test_bulk_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "bulk_fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 12})
result = await run.result()
assert len(result["spawn"]["results"]) == 12


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 10})
result = await run.result()
assert len(result["spawn"]["results"]) == 10
6 changes: 2 additions & 4 deletions examples/cancellation/test_cancellation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "cancellation"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["cancellation"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("CancelWorkflow", {})
result = await run.result()
# TODO is this the expected result for a timed out run...
Expand Down
10 changes: 3 additions & 7 deletions examples/concurrency_limit/test_concurrency_limit.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import asyncio

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.parametrize("worker", ["concurrency_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_runs = 6
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 2 additions & 5 deletions examples/concurrency_limit_rr/test_concurrency_limit_rr.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import asyncio
import time

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
6 changes: 2 additions & 4 deletions examples/dag/test_dag.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "dag"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["dag"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("DagWorkflow", {})
result = await run.result()

Expand Down
9 changes: 4 additions & 5 deletions examples/fanout/test_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2
6 changes: 2 additions & 4 deletions examples/logger/test_logger.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "logger"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["logger"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("LoggingWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
6 changes: 2 additions & 4 deletions examples/on_failure/test_on_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.job_run_status import JobRunStatus
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "on_failure"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["on_failure"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("OnFailureWorkflow", {})
try:
await run.result()
Expand Down
6 changes: 2 additions & 4 deletions examples/rate_limit/test_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "rate_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["rate_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):

run1 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
run2 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
Expand Down
9 changes: 4 additions & 5 deletions examples/timeout/test_timeout.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "timeout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("TimeoutWorkflow", {})
try:
await run.result()
Expand All @@ -18,7 +16,8 @@ async def test_run_timeout(hatchet: Hatchet):


@pytest.mark.asyncio(scope="session")
async def test_run_refresh_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_refresh_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("RefreshTimeoutWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
Empty file removed tests/__init__.py
Empty file.
1 change: 0 additions & 1 deletion tests/utils/__init__.py

This file was deleted.

52 changes: 0 additions & 52 deletions tests/utils/bg_worker.py

This file was deleted.