Skip to content

Commit

Permalink
Merge pull request #46 from FAST-HEP/kreczko-issue-37
Browse files Browse the repository at this point in the history
Task Dependencies
  • Loading branch information
kreczko authored Feb 12, 2025
2 parents 1ed88ad + 8fcc95d commit d332d46
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 41 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ repos:
args: []
additional_dependencies:
- dill
- jinja2
- plumbum
- pytest
- pydantic
Expand Down
50 changes: 46 additions & 4 deletions docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,52 @@

`fasthep-flow` does not implement any processing itself, but rather delegates
between user workflow description (the
[YAML configuration file](./configuration/index.md)), the workflow tasks (e.g.
**Python Callables**), the **workflow DAG** and the **Orchestration** layer.
Unless excplicitely stated, every workflow has to start with a **Data task**,
has one or more **Processing tasks**, and end with an **Output task**.
[YAML configuration file](./configuration/index.md)), the workflow
representation (e.g. **Python Callables**), the **workflow management** and the
**Execution Environment**.

<!-- Unless excplicitely stated, every workflow has to start with a **Data task**,
has one or more **Processing tasks**, and end with an **Output task**. -->

A simplified few of this approach is shown below:

```{mermaid}
flowchart LR
YAML
subgraph Config interpretation
FAST-HEP["fasthep-flow"]
Your["custom"]
end
subgraph Internal Workflow Representation
Repr[fasthep_flow.Workflow]
end
subgraph External Workflow management
Hamilton
Pipefunc
Snakemake
end
subgraph Execution Environment
local
Dask
Other
end
YAML --> FAST-HEP
YAML --> Your
FAST-HEP --> Repr
Your --> Repr
Repr --> Hamilton
Repr --> Pipefunc
Repr --> Snakemake
Hamilton --> local
Hamilton --> Dask
Hamilton --> Other
Pipefunc --> local
Pipefunc --> Dask
Pipefunc --> Other
Snakemake --> local
Snakemake --> Other
```

## Tasks

Expand Down
1 change: 1 addition & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ section contains some real-life examples of how to use `fasthep-flow`.

```{toctree}
hello_world.md
task_dependencies.md
atlas_example.md
cms_pub_example.md
dune_example.md
Expand Down
56 changes: 56 additions & 0 deletions docs/examples/task_dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Task Dependencies

By default all tasks are run in sequence and each task takes the output of the
previous task as input. However, you can specify dependencies between tasks to
run them in parallel.

```yaml
tasks:
- name: "A"
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: echo "A"
- name: "B"
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: echo "B"
- name: "C"
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: echo "C"
dependencies:
- "A"
- name: "D"
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: echo "D"
dependencies:
- "B"
- name: "Y"
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: echo "Y"
dependencies:
- "C"
- "D"
```
which will create the following flow:
```{mermaid}
flowchart TD
A["A()"]
B["B()"]
C["C(A, str)"]
D["D(B, str)"]
Y["Y(C, D, str)"]
A --> C --> Y
B --> D --> Y
```

## Next steps

This was a very simple example, but it shows the basic concepts of
`fasthep-flow`. For more realistic examples, see the experiment specific
examples in [Examples](./index.md). For more advanced examples, see
[Advanced Examples](../advanced_examples/index.md).
16 changes: 12 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ dependencies = [
"pydantic",
"pyyaml >=5.4",
"omegaconf >=2.1",
"sf-hamilton[visualization,dask-complete]",
"sf-hamilton >=1.86",
"typer >=0.4",
"typing_extensions >=3.10",
]

[project.optional-dependencies]
test = [
"pytest >=6",
"pytest-cov >=3",
dask = [
"dask[distributed,dataframe]",
]
dev = [
"pytest >=6",
Expand All @@ -60,6 +59,15 @@ docs = [
"sphinx-inline-tabs",
"furo>=2023.08.17",
]
test = [
"pytest >=6",
"pytest-cov >=3",
]
visualisation = [
"graphviz",
"matplotlib",
"networkx",
]

[project.urls]
Homepage = "https://github.com/FAST-HEP/fasthep-flow"
Expand Down
8 changes: 6 additions & 2 deletions src/fasthep_flow/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Command line interface for fasthep-flow."""

from __future__ import annotations

from pathlib import Path
Expand All @@ -8,7 +9,7 @@

from .config import FlowConfig, load_config
from .orchestration import workflow_to_hamilton_dag
from .workflow import Workflow
from .workflow import create_workflow

app = typer.Typer()

Expand Down Expand Up @@ -59,12 +60,15 @@ def execute(
typer.echo(f"Executing {config}...")

cfg = init_config(config, overrides)
workflow = Workflow(config=cfg)
workflow = create_workflow(cfg)
# workflow = Workflow(config=cfg)
save_path = workflow.save(Path(save_path))
dag = workflow_to_hamilton_dag(workflow, save_path)
dag.visualize_execution(
final_vars=workflow.task_names,
output_file_path=Path(workflow.save_path) / "dag.png",
orient="TB",
show_legend=False,
)
# TODO: if specified, run a specific task/node with execute_node
results = dag.execute(workflow.task_names, inputs={})
Expand Down
24 changes: 24 additions & 0 deletions src/fasthep_flow/dsl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""THIS IS A DRAFT
Defines the FAST-HEP flow DSL specification.
The DSL defines how a config is interpreted and how the tasks are resolved.
All implementations have to return a valid fasthep_flow.Workflow object.
A particular DSL can be selected with the `version` key in the config:
```yaml
version: v0
```
or more explicitly:
```yaml
version: fasthep_flow.dsl.v0
```
"""

from __future__ import annotations

from .default import v0

__all__ = ["v0"]
5 changes: 5 additions & 0 deletions src/fasthep_flow/dsl/default/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from . import _v0 as v0

__all__ = ["v0"]
Empty file.
64 changes: 43 additions & 21 deletions src/fasthep_flow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any

import dill
import jinja2

from .config import FlowConfig

Expand All @@ -33,6 +34,7 @@ class Task:
type: str
kwargs: dict[str, Any]
payload: Any
needs: list[str] = field(default_factory=list)

def __call__(self) -> Any:
return self.payload()
Expand All @@ -48,8 +50,16 @@ def __name__(self) -> str:


TASK_SOURCE_TEMPLATE = """
def {task_name}() -> dict[str, Any]:
return {task_definition}()
{% if needs -%}
def {{task_name}}(
{% for need in needs -%}
{{need}}: dict[str, Any],
{%- endfor %}
) -> dict[str, Any]:
{%- else -%}
def {{task_name}}() -> dict[str, Any]:
{%- endif %}
return {{task_definition}}()
"""


Expand All @@ -67,8 +77,8 @@ def get_task_source(obj: Any, task_name: str) -> str:
task_source = dill.source.getsource(obj)
task_definition = str(task_source.replace(task_base_source, "").strip())

return TASK_SOURCE_TEMPLATE.format(
task_name=task_name, task_definition=task_definition
return jinja2.Template(TASK_SOURCE_TEMPLATE).render(
task_name=task_name, task_definition=task_definition, needs=obj.needs
)


Expand Down Expand Up @@ -99,27 +109,13 @@ def create_save_path(base_path: Path, workflow_name: str, config_hash: str) -> P
class Workflow:
"""Wrapper for any compute graph implementation we want to support."""

config: FlowConfig
# config: FlowConfig
metadata: dict[str, Any] = field(default_factory=dict[str, Any])
tasks: list[Task] = field(default_factory=list)
task_names: list[str] = field(default_factory=list)
name: str = "fasthep-flow"
save_path: str = "~/.fasthep/flow/"

def __post_init__(self) -> None:
self.name = self.config.metadata.get("name", self.name)
for task in self.config.tasks:
# TODO: set ouput_path for each task
self.tasks.append(
Task(
name=task.name,
type=task.type,
kwargs=task.kwargs if task.kwargs is not None else {},
payload=task.resolve() if hasattr(task, "resolve") else None,
# TODO: pass information about the task's dependencies and execution environment
)
)
self.task_names = [task.safe_name for task in self.tasks]

def __call__(self) -> Any:
"""Function to execute all tasks in the workflow."""
for task in self.tasks:
Expand Down Expand Up @@ -153,7 +149,7 @@ def save(self, base_path: Path = Path("~/.fasthep/flow")) -> str:
"""
base_path = Path(base_path).expanduser().resolve()

config_file = Path(self.config.config_file)
config_file = Path(self.metadata["config_file"])
config_hash = get_config_hash(config_file)
path = create_save_path(base_path, self.name, config_hash)
# TODO: check if things exist and skip if they do
Expand Down Expand Up @@ -200,3 +196,29 @@ def load_tasks_module(workflow: Workflow) -> ModuleType:
task_spec.loader.exec_module(task_functions)
sys.modules["tasks"] = task_functions
return task_functions


def create_workflow(config: FlowConfig) -> Workflow:
"""Create a workflow from a configuration."""
name = config.metadata.get("name", Workflow.name)
tasks = []
for task in config.tasks:
# TODO: set ouput_path for each task
tasks.append(
Task(
name=task.name,
type=task.type,
kwargs=task.kwargs if task.kwargs is not None else {},
payload=task.resolve() if hasattr(task, "resolve") else None,
needs=task.needs if task.needs else [],
# TODO: pass information about the task's dependencies and execution environment
)
)
task_names = [task.safe_name for task in tasks]

return Workflow(
metadata=config.metadata,
tasks=tasks,
task_names=task_names,
name=name,
)
18 changes: 17 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@pytest.fixture()
def simple_config_yaml() -> Path:
return current_dir / "simple_config.yaml"
return current_dir / "data" / "simple_config.yaml"


@pytest.fixture()
Expand All @@ -23,3 +23,19 @@ def simple_config(simple_config_yaml) -> str:
@pytest.fixture()
def simple_dict_config(simple_config) -> Any:
return yaml.safe_load(simple_config)


@pytest.fixture()
def parallel_config_yaml() -> Path:
return current_dir / "data" / "parallel.yaml"


@pytest.fixture()
def parallel_config(parallel_config_yaml) -> str:
with Path.open(parallel_config_yaml, "r") as file:
return file.read()


@pytest.fixture()
def parallel_dict_config(parallel_config) -> Any:
return yaml.safe_load(parallel_config)
Loading

0 comments on commit d332d46

Please sign in to comment.