Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit test stub autogenerator #824

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions hamilton/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class CliState:
),
]

PYTHON_FILE_MODULE_ANNOTATIONS = Annotated[
Path,
typer.Argument(
help="Path to python file or module",
exists=True,
dir_okay=False,
readable=True,
resolve_path=True,
),
]

NAME_ANNOTATIONS = Annotated[
Optional[str],
typer.Option("--name", "-n", help="Name of the dataflow. Default: Derived from MODULES."),
Expand Down Expand Up @@ -84,6 +95,18 @@ class CliState:
),
]

OUTPUT_ANNOTATIONS = Annotated[
Path,
typer.Option(
"--output",
"-o",
help="Output path of unit test file.",
dir_okay=False,
writable=True,
resolve_path=True,
),
]


# TODO add `experiments` for `hamilton.plugins.h_experiments`
# TODO add `dataflows` submenu to manage locally installed dataflows
Expand Down Expand Up @@ -284,5 +307,31 @@ def view(
)


@cli.command()
def create_tests(
ctx: typer.Context,
python_file_module: PYTHON_FILE_MODULE_ANNOTATIONS,
output_file_path: OUTPUT_ANNOTATIONS = None,
):
"""Create unit tests for given python file or module"""
if output_file_path is None:
output_file_path = Path(f"test_{Path(python_file_module).stem}.py")

if output_file_path.is_dir():
raise ValueError("output_file_path can not be a directory")

_try_command(
cmd=commands.create_tests,
input_file_path=python_file_module,
output_file_path=output_file_path,
)
_response_handler(
ctx=ctx,
response=Response(
command="create_tests", success=True, message={"path": str(output_file_path)}
),
)


if __name__ == "__main__":
cli()
6 changes: 6 additions & 0 deletions hamilton/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ def version(dr: driver.Driver) -> dict:
def view(dr: driver.Driver, output_file_path: Path = Path("dag.png")) -> None:
"""Display all functions of the instantiated Driver"""
dr.display_all_functions(output_file_path)


def create_tests(input_file_path: Path, output_file_path: Optional[Path] = None) -> None:
"""Create unit tests for the given input file path"""
res = logic.create_tests(input_file_path, output_file_path)
return res
62 changes: 62 additions & 0 deletions hamilton/cli/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,65 @@ def _read_py_context(file_path: Path) -> dict:
context[k] = getattr(module, k, None)

return context


def create_tests(input: Path, output: Path) -> None:
"""Create unit tests for the input python file or module"""
import importlib
import inspect

def _get_functions_and_annotations(module):
functions = inspect.getmembers(module, inspect.isfunction)
annotations, names = list(), list()
for name, fn in functions:
names.append(name)
annotations.append(fn.__annotations__)
return annotations, names

def _write_import_statement(output, annotations):
unique_types = set()
for annotation in annotations:
unique_types |= set([v for v in annotation.values()])
primitives = (bool, str, int, float, type(None))
for t in unique_types:
if t not in primitives:
with open(output, "a") as f:
f.write(f"from {t.__module__} import {t.__name__}\n")

module = importlib.import_module(input.stem)
annotations, names = _get_functions_and_annotations(module)

strip_output_names = []
if output.exists():
output_module = importlib.import_module(output.stem)
output_annotations, output_names = _get_functions_and_annotations(output_module)
strip_output_names = [name.lstrip("test_") for name in output_names]
else:
with open(output, "a") as f:
f.write(f"import {input.stem}\n")
_write_import_statement(output, annotations)

new_functions = [name for name in names if name not in strip_output_names]

for name, annotation in zip(names, annotations):
if name in new_functions:
test_stub = _get_test_stub(input.stem, name, annotation)
with open(output, "a") as f:
f.write(test_stub)


def _get_test_stub(input_name: str, name: str, annotation: dict) -> str:
""""""
import textwrap

test_stub = "\n\n"
test_stub += f"""def test_{name}():"""
for k, v in annotation.items():
if k == "return":
k = "expected"
test_stub += textwrap.indent(
f"""{k}: {v.__name__} = {v.__name__}() #TODO fill in""", "\n "
)
test_stub += textwrap.indent(f"""assert expected == {input_name}.{name}()""", "\n ")

return test_stub