diff --git a/python_modules/dagster/dagster/_components/impls/pipes_subprocess_script_collection.py b/python_modules/dagster/dagster/_components/impls/pipes_subprocess_script_collection.py index ff59f3c48aba5..817d39b1ae139 100644 --- a/python_modules/dagster/dagster/_components/impls/pipes_subprocess_script_collection.py +++ b/python_modules/dagster/dagster/_components/impls/pipes_subprocess_script_collection.py @@ -9,6 +9,7 @@ from dagster._components.core.component_decl_builder import YamlComponentDecl from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.asset_decorator import multi_asset from dagster._core.execution.context.asset_execution_context import AssetExecutionContext from dagster._core.pipes.subprocess import PipesSubprocessClient @@ -41,19 +42,27 @@ def to_asset_spec(self) -> AssetSpec: class PipesSubprocessScriptParams(BaseModel): + path: str assets: Sequence[AssetSpecModel] +class PipesSubprocessScriptCollectionParams(BaseModel): + scripts: Sequence[PipesSubprocessScriptParams] + + class PipesSubprocessScriptCollection(Component): - params_schema = Mapping[str, PipesSubprocessScriptParams] + params_schema = PipesSubprocessScriptCollectionParams - def __init__( - self, dirpath: Path, path_specs: Optional[Mapping[str, Sequence[AssetSpec]]] = None - ): + def __init__(self, dirpath: Path, path_specs: Mapping[Path, Sequence[AssetSpec]]): self.dirpath = dirpath # mapping from the script name (e.g. /path/to/script_abc.py -> script_abc) # to the specs it produces - self.path_specs = path_specs or {} + self.path_specs = path_specs + + @staticmethod + def introspect_from_path(path: Path) -> "PipesSubprocessScriptCollection": + path_specs = {path: [AssetSpec(path.stem)] for path in list(path.rglob("*.py"))} + return PipesSubprocessScriptCollection(dirpath=path, path_specs=path_specs) @classmethod def from_decl_node( @@ -63,28 +72,27 @@ def from_decl_node( loaded_params = TypeAdapter(cls.params_schema).validate_python( component_decl.defs_file_model.component_params ) - return cls( - dirpath=component_decl.path, - path_specs={ - k: [vv.to_asset_spec() for vv in v.assets] for k, v in loaded_params.items() - } - if loaded_params - else None, - ) + + path_specs = {} + for script in loaded_params.scripts: + script_path = component_decl.path / script.path + if not script_path.exists(): + raise FileNotFoundError(f"Script {script_path} does not exist") + path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets] + + return cls(dirpath=component_decl.path, path_specs=path_specs) def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions": from dagster._core.definitions.definitions_class import Definitions return Definitions( - assets=[self._create_asset_def(path) for path in list(self.dirpath.rglob("*.py"))], + assets=[self._create_asset_def(path, specs) for path, specs in self.path_specs.items()], resources={"pipes_client": PipesSubprocessClient()}, ) - def _create_asset_def(self, path: Path): - @multi_asset( - specs=self.path_specs.get(path.stem) or [AssetSpec(key=path.stem)], - name=f"script_{path.stem}", - ) + def _create_asset_def(self, path: Path, specs: Sequence[AssetSpec]) -> AssetsDefinition: + # TODO: allow name paraeterization + @multi_asset(specs=specs, name=f"script_{path.stem}") def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient): cmd = [shutil.which("python"), path] return pipes_client.run(command=cmd, context=context).get_results() diff --git a/python_modules/dagster/dagster_tests/components_tests/code_locations/python_script_location/components/scripts/defs.yml b/python_modules/dagster/dagster_tests/components_tests/code_locations/python_script_location/components/scripts/defs.yml index 61a83c617744b..3fda5a1194652 100644 --- a/python_modules/dagster/dagster_tests/components_tests/code_locations/python_script_location/components/scripts/defs.yml +++ b/python_modules/dagster/dagster_tests/components_tests/code_locations/python_script_location/components/scripts/defs.yml @@ -1,11 +1,15 @@ component_type: pipes_subprocess_script_collection component_params: - script_one: - assets: - - key: a - - key: b - deps: [up1, up2] - script_three: - assets: - - key: override_key + scripts: + - path: script_one.py + assets: + - key: a + - key: b + deps: [up1, up2] + - path: script_two.py + assets: + - key: c + - path: subdir/script_three.py + assets: + - key: override_key diff --git a/python_modules/dagster/dagster_tests/components_tests/test_pipes_subprocess_script_collection.py b/python_modules/dagster/dagster_tests/components_tests/test_pipes_subprocess_script_collection.py index 741028956ac80..05f7a5bac7c52 100644 --- a/python_modules/dagster/dagster_tests/components_tests/test_pipes_subprocess_script_collection.py +++ b/python_modules/dagster/dagster_tests/components_tests/test_pipes_subprocess_script_collection.py @@ -1,5 +1,6 @@ from pathlib import Path +from dagster import AssetKey from dagster._components.core.component import Component, ComponentLoadContext, ComponentRegistry from dagster._components.core.component_decl_builder import DefsFileModel from dagster._components.core.component_defs_builder import ( @@ -24,6 +25,15 @@ def script_load_context() -> ComponentLoadContext: return ComponentLoadContext(registry=registry(), resources={}) +def _asset_keys(component: Component) -> set[AssetKey]: + return { + key + for key in component.build_defs(ComponentLoadContext.for_test()) + .get_asset_graph() + .get_all_asset_keys() + } + + def _assert_assets(component: Component, expected_assets: int) -> None: defs = component.build_defs(ComponentLoadContext.for_test()) assert len(defs.get_asset_graph().get_all_asset_keys()) == expected_assets @@ -32,7 +42,9 @@ def _assert_assets(component: Component, expected_assets: int) -> None: def test_python_native() -> None: - component = PipesSubprocessScriptCollection(LOCATION_PATH / "components" / "scripts") + component = PipesSubprocessScriptCollection.introspect_from_path( + LOCATION_PATH / "components" / "scripts" + ) _assert_assets(component, 3) @@ -44,18 +56,24 @@ def test_python_params() -> None: defs_file_model=DefsFileModel( component_type="pipes_subprocess_script_collection", component_params={ - "script_one": { - "assets": [ - {"key": "a"}, - {"key": "b", "deps": ["up1", "up2"]}, - ] - }, - "script_three": {"assets": [{"key": "key_override"}]}, + "scripts": [ + { + "path": "script_one.py", + "assets": [{"key": "a"}, {"key": "b", "deps": ["up1", "up2"]}], + }, + {"path": "subdir/script_three.py", "assets": [{"key": "key_override"}]}, + ] }, ), ), ) - _assert_assets(component, 6) + assert _asset_keys(component) == { + AssetKey("a"), + AssetKey("b"), + AssetKey("up1"), + AssetKey("up2"), + AssetKey("key_override"), + } def test_load_from_path() -> None: @@ -63,19 +81,28 @@ def test_load_from_path() -> None: script_load_context(), LOCATION_PATH / "components" ) assert len(components) == 1 + assert _asset_keys(components[0]) == { + AssetKey("a"), + AssetKey("b"), + AssetKey("c"), + AssetKey("up1"), + AssetKey("up2"), + AssetKey("override_key"), + } _assert_assets(components[0], 6) - -def test_build_from_path() -> None: - components = build_components_from_component_folder( - script_load_context(), LOCATION_PATH / "components" - ) - defs = defs_from_components( context=script_load_context(), components=components, resources={}, ) - assert len(defs.get_asset_graph().get_all_asset_keys()) == 6 + assert defs.get_asset_graph().get_all_asset_keys() == { + AssetKey("a"), + AssetKey("b"), + AssetKey("c"), + AssetKey("up1"), + AssetKey("up2"), + AssetKey("override_key"), + }