Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytest Tweaks, Part III - Test Isolation #277

Merged
merged 5 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import logging
import subprocess
from io import BytesIO
from threading import Thread
from typing import AsyncGenerator
from typing import cast, Callable

import psutil
import pytest
import pytest_asyncio

Expand All @@ -14,3 +20,48 @@ async def aiohatchet() -> AsyncGenerator[Hatchet, None]:
@pytest.fixture(scope="session")
def hatchet() -> Hatchet:
return Hatchet(debug=True)


@pytest.fixture()
def worker(request: pytest.FixtureRequest):
example = cast(str, request.param)

command = ["poetry", "run", example]

logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")

def wait_for_log_message(pipe: BytesIO) -> None:
for line in iter(pipe.readline, b""):
log_line = line.decode().strip()
logging.info(log_line) # Log the output
if "sending heartbeat" in log_line:
logging.info(f"Found target log message: {log_line}")
break

Thread(target=wait_for_log_message, args=(proc.stdout, ), daemon=True).start()

def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
log_func(line.decode().strip())

Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()

yield proc

logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()

_, alive = psutil.wait_procs([parent] + children, timeout=3)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parametrizing the fixture with the example to run

@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(aiohatchet: Hatchet):
async def test_run(aiohatchet: Hatchet, worker):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to explicitly request the fixture now

num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 3 additions & 4 deletions examples/api/test_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
## IMPORTANT: Worker needs to be set here to ensure at least one workflow exists
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_list_workflows(hatchet: Hatchet, worker):
workflows = hatchet.rest.workflow_list()
Expand All @@ -16,6 +14,7 @@ async def test_list_workflows(hatchet: Hatchet, worker):


# requires scope module or higher for shared event loop
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
@pytest.mark.asyncio(scope="session")
async def test_async_list_workflows(aiohatchet: Hatchet, worker):
workflows = await aiohatchet.rest.aio.workflow_list()
Expand Down
9 changes: 4 additions & 5 deletions examples/async/test_async.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "async"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["async"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"


@pytest.mark.parametrize("worker", ["async"], indirect=True)
@pytest.mark.skip(reason="Skipping this test until we can dedicate more time to debug")
@pytest.mark.asyncio(scope="session")
async def test_run_async(aiohatchet: Hatchet):
async def test_run_async(aiohatchet: Hatchet, worker):
run = await aiohatchet.admin.aio.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"
9 changes: 4 additions & 5 deletions examples/bulk_fanout/test_bulk_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "bulk_fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 12})
result = await run.result()
assert len(result["spawn"]["results"]) == 12


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["bulk_fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("BulkParent", {"n": 10})
result = await run.result()
assert len(result["spawn"]["results"]) == 10
6 changes: 2 additions & 4 deletions examples/cancellation/test_cancellation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "cancellation"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["cancellation"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("CancelWorkflow", {})
result = await run.result()
# TODO is this the expected result for a timed out run...
Expand Down
10 changes: 3 additions & 7 deletions examples/concurrency_limit/test_concurrency_limit.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import asyncio

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.parametrize("worker", ["concurrency_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_runs = 6
runs: list[WorkflowRunRef] = []

Expand Down
7 changes: 2 additions & 5 deletions examples/concurrency_limit_rr/test_concurrency_limit_rr.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import asyncio
import time

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["concurrency_limit_rr"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
num_groups = 2
runs: list[WorkflowRunRef] = []

Expand Down
6 changes: 2 additions & 4 deletions examples/dag/test_dag.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "dag"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["dag"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("DagWorkflow", {})
result = await run.result()

Expand Down
9 changes: 4 additions & 5 deletions examples/fanout/test_fanout.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "fanout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run2(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["fanout"], indirect=True)
async def test_run2(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("Parent", {"n": 2})
result = await run.result()
assert len(result["spawn"]["results"]) == 2
6 changes: 2 additions & 4 deletions examples/logger/test_logger.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "logger"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["logger"], indirect=True)
async def test_run(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("LoggingWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
6 changes: 2 additions & 4 deletions examples/on_failure/test_on_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.job_run_status import JobRunStatus
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "on_failure"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["on_failure"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("OnFailureWorkflow", {})
try:
await run.result()
Expand Down
6 changes: 2 additions & 4 deletions examples/rate_limit/test_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "rate_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["rate_limit"], indirect=True)
async def test_run(hatchet: Hatchet, worker):

run1 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
run2 = hatchet.admin.run_workflow("RateLimitWorkflow", {})
Expand Down
9 changes: 4 additions & 5 deletions examples/timeout/test_timeout.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker

worker = fixture_bg_worker(["poetry", "run", "timeout"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("TimeoutWorkflow", {})
try:
await run.result()
Expand All @@ -18,7 +16,8 @@ async def test_run_timeout(hatchet: Hatchet):


@pytest.mark.asyncio(scope="session")
async def test_run_refresh_timeout(hatchet: Hatchet):
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_refresh_timeout(hatchet: Hatchet, worker):
run = hatchet.admin.run_workflow("RefreshTimeoutWorkflow", {})
result = await run.result()
assert result["step1"]["status"] == "success"
Empty file removed tests/__init__.py
Empty file.
1 change: 0 additions & 1 deletion tests/utils/__init__.py

This file was deleted.

52 changes: 0 additions & 52 deletions tests/utils/bg_worker.py

This file was deleted.

Loading