Skip to content

Commit

Permalink
[components] Explicit schema for PipesSubprocessScriptCollection (dag…
Browse files Browse the repository at this point in the history
…ster-io#26216)

## Summary & Motivation

This changes `PipesSubprocessScriptCollection` to rely less on automatic registration of scripts and instead relies on more explicit configuration. The behavior was really unexpected and there is no way to opt out of it (e.g. if there is a shared python file for common code).

I think we re-add some autodiscovery capabilities but built on this foundation.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Dec 1, 2024
1 parent d01147a commit 859465b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster._components.core.component_decl_builder import YamlComponentDecl
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.pipes.subprocess import PipesSubprocessClient
Expand Down Expand Up @@ -41,19 +42,27 @@ def to_asset_spec(self) -> AssetSpec:


class PipesSubprocessScriptParams(BaseModel):
path: str
assets: Sequence[AssetSpecModel]


class PipesSubprocessScriptCollectionParams(BaseModel):
scripts: Sequence[PipesSubprocessScriptParams]


class PipesSubprocessScriptCollection(Component):
params_schema = Mapping[str, PipesSubprocessScriptParams]
params_schema = PipesSubprocessScriptCollectionParams

def __init__(
self, dirpath: Path, path_specs: Optional[Mapping[str, Sequence[AssetSpec]]] = None
):
def __init__(self, dirpath: Path, path_specs: Mapping[Path, Sequence[AssetSpec]]):
self.dirpath = dirpath
# mapping from the script name (e.g. /path/to/script_abc.py -> script_abc)
# to the specs it produces
self.path_specs = path_specs or {}
self.path_specs = path_specs

@staticmethod
def introspect_from_path(path: Path) -> "PipesSubprocessScriptCollection":
path_specs = {path: [AssetSpec(path.stem)] for path in list(path.rglob("*.py"))}
return PipesSubprocessScriptCollection(dirpath=path, path_specs=path_specs)

@classmethod
def from_decl_node(
Expand All @@ -63,28 +72,27 @@ def from_decl_node(
loaded_params = TypeAdapter(cls.params_schema).validate_python(
component_decl.defs_file_model.component_params
)
return cls(
dirpath=component_decl.path,
path_specs={
k: [vv.to_asset_spec() for vv in v.assets] for k, v in loaded_params.items()
}
if loaded_params
else None,
)

path_specs = {}
for script in loaded_params.scripts:
script_path = component_decl.path / script.path
if not script_path.exists():
raise FileNotFoundError(f"Script {script_path} does not exist")
path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets]

return cls(dirpath=component_decl.path, path_specs=path_specs)

def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions":
from dagster._core.definitions.definitions_class import Definitions

return Definitions(
assets=[self._create_asset_def(path) for path in list(self.dirpath.rglob("*.py"))],
assets=[self._create_asset_def(path, specs) for path, specs in self.path_specs.items()],
resources={"pipes_client": PipesSubprocessClient()},
)

def _create_asset_def(self, path: Path):
@multi_asset(
specs=self.path_specs.get(path.stem) or [AssetSpec(key=path.stem)],
name=f"script_{path.stem}",
)
def _create_asset_def(self, path: Path, specs: Sequence[AssetSpec]) -> AssetsDefinition:
# TODO: allow name paraeterization
@multi_asset(specs=specs, name=f"script_{path.stem}")
def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient):
cmd = [shutil.which("python"), path]
return pipes_client.run(command=cmd, context=context).get_results()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
component_type: pipes_subprocess_script_collection

component_params:
script_one:
assets:
- key: a
- key: b
deps: [up1, up2]
script_three:
assets:
- key: override_key
scripts:
- path: script_one.py
assets:
- key: a
- key: b
deps: [up1, up2]
- path: script_two.py
assets:
- key: c
- path: subdir/script_three.py
assets:
- key: override_key
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path

from dagster import AssetKey
from dagster._components.core.component import Component, ComponentLoadContext, ComponentRegistry
from dagster._components.core.component_decl_builder import DefsFileModel
from dagster._components.core.component_defs_builder import (
Expand All @@ -24,6 +25,15 @@ def script_load_context() -> ComponentLoadContext:
return ComponentLoadContext(registry=registry(), resources={})


def _asset_keys(component: Component) -> set[AssetKey]:
return {
key
for key in component.build_defs(ComponentLoadContext.for_test())
.get_asset_graph()
.get_all_asset_keys()
}


def _assert_assets(component: Component, expected_assets: int) -> None:
defs = component.build_defs(ComponentLoadContext.for_test())
assert len(defs.get_asset_graph().get_all_asset_keys()) == expected_assets
Expand All @@ -32,7 +42,9 @@ def _assert_assets(component: Component, expected_assets: int) -> None:


def test_python_native() -> None:
component = PipesSubprocessScriptCollection(LOCATION_PATH / "components" / "scripts")
component = PipesSubprocessScriptCollection.introspect_from_path(
LOCATION_PATH / "components" / "scripts"
)
_assert_assets(component, 3)


Expand All @@ -44,38 +56,53 @@ def test_python_params() -> None:
defs_file_model=DefsFileModel(
component_type="pipes_subprocess_script_collection",
component_params={
"script_one": {
"assets": [
{"key": "a"},
{"key": "b", "deps": ["up1", "up2"]},
]
},
"script_three": {"assets": [{"key": "key_override"}]},
"scripts": [
{
"path": "script_one.py",
"assets": [{"key": "a"}, {"key": "b", "deps": ["up1", "up2"]}],
},
{"path": "subdir/script_three.py", "assets": [{"key": "key_override"}]},
]
},
),
),
)
_assert_assets(component, 6)
assert _asset_keys(component) == {
AssetKey("a"),
AssetKey("b"),
AssetKey("up1"),
AssetKey("up2"),
AssetKey("key_override"),
}


def test_load_from_path() -> None:
components = build_components_from_component_folder(
script_load_context(), LOCATION_PATH / "components"
)
assert len(components) == 1
assert _asset_keys(components[0]) == {
AssetKey("a"),
AssetKey("b"),
AssetKey("c"),
AssetKey("up1"),
AssetKey("up2"),
AssetKey("override_key"),
}

_assert_assets(components[0], 6)


def test_build_from_path() -> None:
components = build_components_from_component_folder(
script_load_context(), LOCATION_PATH / "components"
)

defs = defs_from_components(
context=script_load_context(),
components=components,
resources={},
)

assert len(defs.get_asset_graph().get_all_asset_keys()) == 6
assert defs.get_asset_graph().get_all_asset_keys() == {
AssetKey("a"),
AssetKey("b"),
AssetKey("c"),
AssetKey("up1"),
AssetKey("up2"),
AssetKey("override_key"),
}

0 comments on commit 859465b

Please sign in to comment.