From ead436d573cdd506d0a83969b635af601a76f393 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Thu, 5 Dec 2024 17:32:09 -0500 Subject: [PATCH] [components] Refactor definition buider code to operate on nodes rather than folders (#26297) ## Summary & Motivation This refactoring makes these functions more generic, so you can create `Definitions` and load components at arbitrary spots in the component hiearchy ## How I Tested These Changes BK --- .../core/component_decl_builder.py | 4 +- .../core/component_defs_builder.py | 51 +++++++++++-------- ...test_pipes_subprocess_script_collection.py | 15 ++++++ 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/examples/experimental/dagster-components/dagster_components/core/component_decl_builder.py b/examples/experimental/dagster-components/dagster_components/core/component_decl_builder.py index ef1de0fade467..82e94c6e2d340 100644 --- a/examples/experimental/dagster-components/dagster_components/core/component_decl_builder.py +++ b/examples/experimental/dagster-components/dagster_components/core/component_decl_builder.py @@ -25,7 +25,7 @@ class ComponentFolder(ComponentDeclNode): sub_decls: Sequence[Union[YamlComponentDecl, "ComponentFolder"]] -def find_component_decl(path: Path) -> Optional[ComponentDeclNode]: +def path_to_decl_node(path: Path) -> Optional[ComponentDeclNode]: # right now, we only support two types of components, both of which are folders # if the folder contains a defs.yml file, it's a component instance # otherwise, it's a folder containing sub-components @@ -43,7 +43,7 @@ def find_component_decl(path: Path) -> Optional[ComponentDeclNode]: subs = [] for subpath in path.iterdir(): - component = find_component_decl(subpath) + component = path_to_decl_node(subpath) if component: subs.append(component) diff --git a/examples/experimental/dagster-components/dagster_components/core/component_defs_builder.py b/examples/experimental/dagster-components/dagster_components/core/component_defs_builder.py index d772a477f021a..653bb65597f3e 100644 --- a/examples/experimental/dagster-components/dagster_components/core/component_defs_builder.py +++ b/examples/experimental/dagster-components/dagster_components/core/component_defs_builder.py @@ -3,11 +3,16 @@ from dagster._utils.warnings import suppress_dagster_warnings -from dagster_components.core.component import Component, ComponentLoadContext, ComponentRegistry +from dagster_components.core.component import ( + Component, + ComponentDeclNode, + ComponentLoadContext, + ComponentRegistry, +) from dagster_components.core.component_decl_builder import ( ComponentFolder, YamlComponentDecl, - find_component_decl, + path_to_decl_node, ) from dagster_components.core.deployment import CodeLocationProjectContext @@ -15,41 +20,43 @@ from dagster._core.definitions.definitions_class import Definitions -def build_component_hierarchy( - context: ComponentLoadContext, component_folder: ComponentFolder +def build_components_from_decl_node( + context: ComponentLoadContext, decl_node: ComponentDeclNode ) -> Sequence[Component]: - to_return = [] - for decl_node in component_folder.sub_decls: - if isinstance(decl_node, YamlComponentDecl): - parsed_defs = decl_node.defs_file_model - component_type = context.registry.get(parsed_defs.component_type) - to_return.append(component_type.from_decl_node(context, decl_node)) - elif isinstance(decl_node, ComponentFolder): - to_return.extend(build_component_hierarchy(context, decl_node)) - else: - raise NotImplementedError(f"Unknown component type {decl_node}") - return to_return + if isinstance(decl_node, YamlComponentDecl): + parsed_defs = decl_node.defs_file_model + component_type = context.registry.get(parsed_defs.component_type) + return [component_type.from_decl_node(context, decl_node)] + elif isinstance(decl_node, ComponentFolder): + components = [] + for sub_decl in decl_node.sub_decls: + components.extend(build_components_from_decl_node(context, sub_decl)) + return components + + raise NotImplementedError(f"Unknown component type {decl_node}") def build_components_from_component_folder( context: ComponentLoadContext, path: Path, ) -> Sequence[Component]: - component_folder = find_component_decl(path) + component_folder = path_to_decl_node(path) assert isinstance(component_folder, ComponentFolder) - return build_component_hierarchy(context, component_folder) + return build_components_from_decl_node(context, component_folder) -def build_defs_from_component_folder( +def build_defs_from_component_path( path: Path, registry: ComponentRegistry, resources: Mapping[str, object], ) -> "Definitions": """Build a definitions object from a folder within the components hierarchy.""" - component_folder = find_component_decl(path=path) - assert isinstance(component_folder, ComponentFolder) context = ComponentLoadContext(resources=resources, registry=registry) - components = build_components_from_component_folder(context=context, path=path) + + decl_node = path_to_decl_node(path=path) + if not decl_node: + raise Exception(f"No component found at path {path}") + components = build_components_from_decl_node(context, decl_node) return defs_from_components(resources=resources, context=context, components=components) @@ -82,7 +89,7 @@ def build_defs_from_toplevel_components_folder( all_defs: List[Definitions] = [] for component in context.component_instances: component_path = Path(context.get_component_instance_path(component)) - defs = build_defs_from_component_folder( + defs = build_defs_from_component_path( path=component_path, registry=context.component_registry, resources=resources or {}, diff --git a/examples/experimental/dagster-components/dagster_components_tests/test_pipes_subprocess_script_collection.py b/examples/experimental/dagster-components/dagster_components_tests/test_pipes_subprocess_script_collection.py index 6e579a6102fcc..77bbc050f75e1 100644 --- a/examples/experimental/dagster-components/dagster_components_tests/test_pipes_subprocess_script_collection.py +++ b/examples/experimental/dagster-components/dagster_components_tests/test_pipes_subprocess_script_collection.py @@ -5,6 +5,7 @@ from dagster_components.core.component_defs_builder import ( YamlComponentDecl, build_components_from_component_folder, + build_defs_from_component_path, defs_from_components, ) from dagster_components.impls.pipes_subprocess_script_collection import ( @@ -81,3 +82,17 @@ def test_load_from_path() -> None: AssetKey("up2"), AssetKey("override_key"), } + + +def test_load_from_location_path() -> None: + defs = build_defs_from_component_path( + LOCATION_PATH / "components" / "scripts", script_load_context().registry, {} + ) + assert defs.get_asset_graph().get_all_asset_keys() == { + AssetKey("a"), + AssetKey("b"), + AssetKey("c"), + AssetKey("up1"), + AssetKey("up2"), + AssetKey("override_key"), + }