Skip to content

Commit

Permalink
[pipes] refactor Pipes tests interface
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Dec 26, 2024
1 parent 62ff166 commit 385428e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 77 deletions.
25 changes: 19 additions & 6 deletions libraries/pipes/tests/dagster-pipes-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ See the [pipes_config.py](src/dagster_pipes_tests/pipes_config.py) class for mor

In order to run the tests, follow these steps:

1. Install `pytest` and `dagster-pipes-tests`:
1. Install `pytest` and `dagster-pipes-tests`. This can be done with [uv](https://docs.astral.sh/uv/):

```shell
uv pip install pytest
# TODO: publish the package to PyPI
uv pip install <path-to-pipes-tests>
# assuming the command is run in libraries/pipes/implementations/<language>
uv add --group dev pytest --editable ../../tests/dagster-pipes-tests
```

2. Import the test suite in your `pytest` code and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.
> [!NOTE]
> To install `dagster-pipes-tests` in a repository other than this one, replace `--editable ../../tests/dagster-pipes-tests` with `git+https://github.com/dagster-io/communioty-integrations.git#subdirectory=libraries/pipes/tests/dagster-pipes-tests`
2. Import the test suite in your `pytest` code (for example, in `tests/test_pipes.py`) and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.

For example, for Java:

Expand All @@ -45,8 +47,13 @@ class TestJavaPipes(PipesTestSuite):
]
```

3 [Optional]. When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.
> [!NOTE]
> Each test has it's own `--test-name` argument which can be used to identify the test being run.
> [!WARNING]
> This code must be placed in a file that is discovered by `pytest`, e.g. starts with `test_`.
When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.

For example, for Java, put the following code in `conftest.py`:

Expand All @@ -59,3 +66,9 @@ import subprocess
def built_jar():
subprocess.run(["./gradlew", "build"], check=True)
```

4. Run the tests with `pytest`:

```shell
uv run pytest
```
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
METADATA = json.loads(METADATA_PATH.read_text())



def _get_current_test_name(request):
return request.node.name.split("[")[0]

def _resolve_metadata_value(
value: Any, metadata_type: PipesMetadataType
) -> MetadataValue:
Expand Down Expand Up @@ -107,64 +111,40 @@ class PipesTestSuite:
# to run all the tests
BASE_ARGS = ["change", "me"]

@parametrize("metadata", METADATA_CASES)
def test_context_reconstruction(
def test_components(
self,
request,
metadata: Dict[str, Any],
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly."""
"""This test checks if the external process can access the context and the message writer correctly.
It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly.
It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly.
"""
work_dir = tmpdir_factory.mktemp("work_dir")

extras_path = work_dir / "extras.json"

with open(str(extras_path), "w") as f:
json.dump(metadata, f)

@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--env",
f"--extras={str(extras_path)}",
f"--job-name={job_name}",
]

return pipes_subprocess_client.run(
context=context,
command=args,
extras=metadata,
).get_materialize_result()

result = materialize(
[my_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
raise_on_error=False,
)

assert result.success

def test_components(
self,
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
args = self.BASE_ARGS + [
"--env",
"--full",
"--extras",
str(extras_path),
"--job-name",
context.run.job_name,
"--test-name",
_get_current_test_name(request),
]

if isinstance(context_injector, PipesS3ContextInjector):
Expand Down Expand Up @@ -203,13 +183,13 @@ def my_asset(
)
def test_extras(
self,
request,
context_injector: PipesContextInjector,
metadata: Dict[str, Any],
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly."""

work_dir = tmpdir_factory.mktemp("work_dir")

metadata_path = work_dir / "metadata.json"
Expand All @@ -222,13 +202,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--extras={metadata_path}",
f"--job-name={job_name}",
"--extras",
metadata_path,
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -262,11 +240,11 @@ def my_asset(

def test_error_reporting(
self,
request,
tmpdir_factory,
capsys,
):
"""This test checks if the external process sends an exception message correctly."""

if not PIPES_CONFIG.general.error_reporting:
pytest.skip("general.error_reporting is not enabled in pipes.toml")

Expand All @@ -282,8 +260,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--throw-error",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -328,6 +306,7 @@ def my_asset(

def test_message_log(
self,
request,
tmpdir_factory,
capsys,
):
Expand All @@ -346,8 +325,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--logging",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -384,7 +363,7 @@ def my_asset(
if f"{level.lower().capitalize()} message" in line:
assert level in line
logged_levels.add(level)

assert logged_levels == expected_levels
assert (
"[pipes] did not receive any messages from external process"
Expand All @@ -394,6 +373,7 @@ def my_asset(
@parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES)
def test_message_report_custom_message(
self,
request,
custom_message_payload: Any,
tmpdir_factory,
capsys,
Expand All @@ -415,14 +395,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--custom-payload-path",
"--custom-payload",
str(custom_payload_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -455,6 +432,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_materialization(
self,
request,
data_version: Optional[str],
asset_key: Optional[List[str]],
tmpdir_factory,
Expand Down Expand Up @@ -491,14 +469,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-materialization",
str(asset_materialization_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -547,6 +522,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_check(
self,
request,
passed: bool,
asset_key: Optional[List[str]],
severity: PipesAssetCheckSeverity,
Expand All @@ -556,9 +532,7 @@ def test_message_report_asset_check(
"""This test checks if the external process sends asset checks correctly."""

if not PIPES_CONFIG.messages.report_asset_check:
pytest.skip(
"messages.report_asset_check is not enabled in pipes.toml"
)
pytest.skip("messages.report_asset_check is not enabled in pipes.toml")

work_dir = tmpdir_factory.mktemp("work_dir")

Expand Down Expand Up @@ -588,14 +562,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
):
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-check",
str(report_asset_check_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down

0 comments on commit 385428e

Please sign in to comment.