diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 612a747..b7452b8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -58,6 +58,7 @@ repos: args: [] additional_dependencies: - dill + - jinja2 - plumbum - pytest - pydantic diff --git a/docs/concepts.md b/docs/concepts.md index ec2d388..e77ce85 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -2,10 +2,52 @@ `fasthep-flow` does not implement any processing itself, but rather delegates between user workflow description (the -[YAML configuration file](./configuration/index.md)), the workflow tasks (e.g. -**Python Callables**), the **workflow DAG** and the **Orchestration** layer. -Unless excplicitely stated, every workflow has to start with a **Data task**, -has one or more **Processing tasks**, and end with an **Output task**. +[YAML configuration file](./configuration/index.md)), the workflow +representation (e.g. **Python Callables**), the **workflow management** and the +**Execution Environment**. + + + +A simplified few of this approach is shown below: + +```{mermaid} +flowchart LR + YAML + subgraph Config interpretation + FAST-HEP["fasthep-flow"] + Your["custom"] + end + subgraph Internal Workflow Representation + Repr[fasthep_flow.Workflow] + end + subgraph External Workflow management + Hamilton + Pipefunc + Snakemake + end + subgraph Execution Environment + local + Dask + Other + end + YAML --> FAST-HEP + YAML --> Your + FAST-HEP --> Repr + Your --> Repr + Repr --> Hamilton + Repr --> Pipefunc + Repr --> Snakemake + Hamilton --> local + Hamilton --> Dask + Hamilton --> Other + Pipefunc --> local + Pipefunc --> Dask + Pipefunc --> Other + Snakemake --> local + Snakemake --> Other + +``` ## Tasks diff --git a/docs/examples/index.md b/docs/examples/index.md index ec51d73..e83bc80 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -5,6 +5,7 @@ section contains some real-life examples of how to use `fasthep-flow`. ```{toctree} hello_world.md +task_dependencies.md atlas_example.md cms_pub_example.md dune_example.md diff --git a/docs/examples/task_dependencies.md b/docs/examples/task_dependencies.md new file mode 100644 index 0000000..337ec39 --- /dev/null +++ b/docs/examples/task_dependencies.md @@ -0,0 +1,56 @@ +# Task Dependencies + +By default all tasks are run in sequence and each task takes the output of the +previous task as input. However, you can specify dependencies between tasks to +run them in parallel. + +```yaml +tasks: + - name: "A" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo "A" + - name: "B" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo "B" + - name: "C" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo "C" + dependencies: + - "A" + - name: "D" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo "D" + dependencies: + - "B" + - name: "Y" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo "Y" + dependencies: + - "C" + - "D" +``` + +which will create the following flow: + +```{mermaid} +flowchart TD + A["A()"] + B["B()"] + C["C(A, str)"] + D["D(B, str)"] + Y["Y(C, D, str)"] + A --> C --> Y + B --> D --> Y +``` + +## Next steps + +This was a very simple example, but it shows the basic concepts of +`fasthep-flow`. For more realistic examples, see the experiment specific +examples in [Examples](./index.md). For more advanced examples, see +[Advanced Examples](../advanced_examples/index.md). diff --git a/pyproject.toml b/pyproject.toml index e379194..674fb0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,15 +36,14 @@ dependencies = [ "pydantic", "pyyaml >=5.4", "omegaconf >=2.1", - "sf-hamilton[visualization,dask-complete]", + "sf-hamilton >=1.86", "typer >=0.4", "typing_extensions >=3.10", ] [project.optional-dependencies] -test = [ - "pytest >=6", - "pytest-cov >=3", +dask = [ + "dask[distributed,dataframe]", ] dev = [ "pytest >=6", @@ -60,6 +59,15 @@ docs = [ "sphinx-inline-tabs", "furo>=2023.08.17", ] +test = [ + "pytest >=6", + "pytest-cov >=3", +] +visualisation = [ + "graphviz", + "matplotlib", + "networkx", +] [project.urls] Homepage = "https://github.com/FAST-HEP/fasthep-flow" diff --git a/src/fasthep_flow/cli.py b/src/fasthep_flow/cli.py index 9a15201..3d8716c 100644 --- a/src/fasthep_flow/cli.py +++ b/src/fasthep_flow/cli.py @@ -1,4 +1,5 @@ """Command line interface for fasthep-flow.""" + from __future__ import annotations from pathlib import Path @@ -8,7 +9,7 @@ from .config import FlowConfig, load_config from .orchestration import workflow_to_hamilton_dag -from .workflow import Workflow +from .workflow import create_workflow app = typer.Typer() @@ -59,12 +60,15 @@ def execute( typer.echo(f"Executing {config}...") cfg = init_config(config, overrides) - workflow = Workflow(config=cfg) + workflow = create_workflow(cfg) + # workflow = Workflow(config=cfg) save_path = workflow.save(Path(save_path)) dag = workflow_to_hamilton_dag(workflow, save_path) dag.visualize_execution( final_vars=workflow.task_names, output_file_path=Path(workflow.save_path) / "dag.png", + orient="TB", + show_legend=False, ) # TODO: if specified, run a specific task/node with execute_node results = dag.execute(workflow.task_names, inputs={}) diff --git a/src/fasthep_flow/dsl/__init__.py b/src/fasthep_flow/dsl/__init__.py new file mode 100644 index 0000000..477f66c --- /dev/null +++ b/src/fasthep_flow/dsl/__init__.py @@ -0,0 +1,24 @@ +"""THIS IS A DRAFT +Defines the FAST-HEP flow DSL specification. + +The DSL defines how a config is interpreted and how the tasks are resolved. +All implementations have to return a valid fasthep_flow.Workflow object. + +A particular DSL can be selected with the `version` key in the config: + +```yaml +version: v0 +``` + +or more explicitly: + +```yaml +version: fasthep_flow.dsl.v0 +``` +""" + +from __future__ import annotations + +from .default import v0 + +__all__ = ["v0"] diff --git a/src/fasthep_flow/dsl/default/__init__.py b/src/fasthep_flow/dsl/default/__init__.py new file mode 100644 index 0000000..e56a8b0 --- /dev/null +++ b/src/fasthep_flow/dsl/default/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from . import _v0 as v0 + +__all__ = ["v0"] diff --git a/src/fasthep_flow/dsl/default/_v0.py b/src/fasthep_flow/dsl/default/_v0.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fasthep_flow/workflow.py b/src/fasthep_flow/workflow.py index 6bef29e..04fe7fd 100644 --- a/src/fasthep_flow/workflow.py +++ b/src/fasthep_flow/workflow.py @@ -18,6 +18,7 @@ from typing import Any import dill +import jinja2 from .config import FlowConfig @@ -33,6 +34,7 @@ class Task: type: str kwargs: dict[str, Any] payload: Any + needs: list[str] = field(default_factory=list) def __call__(self) -> Any: return self.payload() @@ -48,8 +50,16 @@ def __name__(self) -> str: TASK_SOURCE_TEMPLATE = """ -def {task_name}() -> dict[str, Any]: - return {task_definition}() +{% if needs -%} +def {{task_name}}( +{% for need in needs -%} + {{need}}: dict[str, Any], +{%- endfor %} +) -> dict[str, Any]: +{%- else -%} +def {{task_name}}() -> dict[str, Any]: +{%- endif %} + return {{task_definition}}() """ @@ -67,8 +77,8 @@ def get_task_source(obj: Any, task_name: str) -> str: task_source = dill.source.getsource(obj) task_definition = str(task_source.replace(task_base_source, "").strip()) - return TASK_SOURCE_TEMPLATE.format( - task_name=task_name, task_definition=task_definition + return jinja2.Template(TASK_SOURCE_TEMPLATE).render( + task_name=task_name, task_definition=task_definition, needs=obj.needs ) @@ -99,27 +109,13 @@ def create_save_path(base_path: Path, workflow_name: str, config_hash: str) -> P class Workflow: """Wrapper for any compute graph implementation we want to support.""" - config: FlowConfig + # config: FlowConfig + metadata: dict[str, Any] = field(default_factory=dict[str, Any]) tasks: list[Task] = field(default_factory=list) task_names: list[str] = field(default_factory=list) name: str = "fasthep-flow" save_path: str = "~/.fasthep/flow/" - def __post_init__(self) -> None: - self.name = self.config.metadata.get("name", self.name) - for task in self.config.tasks: - # TODO: set ouput_path for each task - self.tasks.append( - Task( - name=task.name, - type=task.type, - kwargs=task.kwargs if task.kwargs is not None else {}, - payload=task.resolve() if hasattr(task, "resolve") else None, - # TODO: pass information about the task's dependencies and execution environment - ) - ) - self.task_names = [task.safe_name for task in self.tasks] - def __call__(self) -> Any: """Function to execute all tasks in the workflow.""" for task in self.tasks: @@ -153,7 +149,7 @@ def save(self, base_path: Path = Path("~/.fasthep/flow")) -> str: """ base_path = Path(base_path).expanduser().resolve() - config_file = Path(self.config.config_file) + config_file = Path(self.metadata["config_file"]) config_hash = get_config_hash(config_file) path = create_save_path(base_path, self.name, config_hash) # TODO: check if things exist and skip if they do @@ -200,3 +196,29 @@ def load_tasks_module(workflow: Workflow) -> ModuleType: task_spec.loader.exec_module(task_functions) sys.modules["tasks"] = task_functions return task_functions + + +def create_workflow(config: FlowConfig) -> Workflow: + """Create a workflow from a configuration.""" + name = config.metadata.get("name", Workflow.name) + tasks = [] + for task in config.tasks: + # TODO: set ouput_path for each task + tasks.append( + Task( + name=task.name, + type=task.type, + kwargs=task.kwargs if task.kwargs is not None else {}, + payload=task.resolve() if hasattr(task, "resolve") else None, + needs=task.needs if task.needs else [], + # TODO: pass information about the task's dependencies and execution environment + ) + ) + task_names = [task.safe_name for task in tasks] + + return Workflow( + metadata=config.metadata, + tasks=tasks, + task_names=task_names, + name=name, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 7e27fa0..8f92993 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ @pytest.fixture() def simple_config_yaml() -> Path: - return current_dir / "simple_config.yaml" + return current_dir / "data" / "simple_config.yaml" @pytest.fixture() @@ -23,3 +23,19 @@ def simple_config(simple_config_yaml) -> str: @pytest.fixture() def simple_dict_config(simple_config) -> Any: return yaml.safe_load(simple_config) + + +@pytest.fixture() +def parallel_config_yaml() -> Path: + return current_dir / "data" / "parallel.yaml" + + +@pytest.fixture() +def parallel_config(parallel_config_yaml) -> str: + with Path.open(parallel_config_yaml, "r") as file: + return file.read() + + +@pytest.fixture() +def parallel_dict_config(parallel_config) -> Any: + return yaml.safe_load(parallel_config) diff --git a/tests/data/parallel.yaml b/tests/data/parallel.yaml new file mode 100644 index 0000000..60b23aa --- /dev/null +++ b/tests/data/parallel.yaml @@ -0,0 +1,33 @@ +tasks: + - name: "A" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo + arguments: ["A"] + - name: "B" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo + arguments: ["B"] + - name: "C" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo + arguments: ["C"] + needs: + - "A" + - name: "D" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo + arguments: ["D"] + needs: + - "B" + - name: "Y" + type: "fasthep_flow.operators.BashOperator" + kwargs: + bash_command: echo + arguments: ["Y"] + needs: + - "C" + - "D" diff --git a/tests/simple_config.yaml b/tests/data/simple_config.yaml similarity index 100% rename from tests/simple_config.yaml rename to tests/data/simple_config.yaml diff --git a/tests/test_workflow.py b/tests/test_workflow.py index f524b8d..51ed7a1 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -2,16 +2,21 @@ import pytest -from fasthep_flow import FlowConfig, Workflow +from fasthep_flow import FlowConfig +from fasthep_flow.workflow import create_workflow, get_task_source @pytest.fixture() -def config(simple_dict_config): - return FlowConfig.from_dictconfig(simple_dict_config) +def workflow(simple_dict_config): + return create_workflow(FlowConfig.from_dictconfig(simple_dict_config)) -def test_create_workflow(config): - workflow = Workflow(config=config) +@pytest.fixture() +def parallel_workflow(parallel_dict_config): + return create_workflow(FlowConfig.from_dictconfig(parallel_dict_config)) + + +def test_create_workflow(workflow): assert workflow.tasks assert len(workflow.tasks) == 1 task = workflow.tasks[0] @@ -22,8 +27,7 @@ def test_create_workflow(config): assert params["arguments"] == ["Hello World!"] -def test_run_workflow(config): - workflow = Workflow(config=config) +def test_run_workflow(workflow): results = workflow.run() assert results assert len(results) == 1 @@ -31,7 +35,22 @@ def test_run_workflow(config): assert result["stdout"] == "Hello World!\n" -def test_task_names(config): - workflow = Workflow(config=config) +def test_task_names(workflow): assert workflow.task_names assert workflow.task_names == ["printEcho"] + + +def test_get_task_source(workflow): + source = get_task_source(workflow.tasks[0], "printEcho") + assert "def printEcho" in source + + +def test_parallel(parallel_workflow): + assert parallel_workflow.tasks + assert len(parallel_workflow.tasks) == 5 + assert parallel_workflow.tasks[0].name == "A" + assert parallel_workflow.tasks[2].name == "C" + assert parallel_workflow.tasks[4].name == "Y" + assert parallel_workflow.tasks[0].needs == [] + assert parallel_workflow.tasks[2].needs == ["A"] + assert parallel_workflow.tasks[4].needs == ["C", "D"]