Skip to content

Commit

Permalink
[components] Refactor definition buider code to operate on nodes rath…
Browse files Browse the repository at this point in the history
…er than folders (#26297)

## Summary & Motivation

This refactoring makes these functions more generic, so you can create `Definitions` and load components at arbitrary spots in the component hiearchy

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Dec 5, 2024
1 parent e24374c commit ead436d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ComponentFolder(ComponentDeclNode):
sub_decls: Sequence[Union[YamlComponentDecl, "ComponentFolder"]]


def find_component_decl(path: Path) -> Optional[ComponentDeclNode]:
def path_to_decl_node(path: Path) -> Optional[ComponentDeclNode]:
# right now, we only support two types of components, both of which are folders
# if the folder contains a defs.yml file, it's a component instance
# otherwise, it's a folder containing sub-components
Expand All @@ -43,7 +43,7 @@ def find_component_decl(path: Path) -> Optional[ComponentDeclNode]:

subs = []
for subpath in path.iterdir():
component = find_component_decl(subpath)
component = path_to_decl_node(subpath)
if component:
subs.append(component)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,60 @@

from dagster._utils.warnings import suppress_dagster_warnings

from dagster_components.core.component import Component, ComponentLoadContext, ComponentRegistry
from dagster_components.core.component import (
Component,
ComponentDeclNode,
ComponentLoadContext,
ComponentRegistry,
)
from dagster_components.core.component_decl_builder import (
ComponentFolder,
YamlComponentDecl,
find_component_decl,
path_to_decl_node,
)
from dagster_components.core.deployment import CodeLocationProjectContext

if TYPE_CHECKING:
from dagster._core.definitions.definitions_class import Definitions


def build_component_hierarchy(
context: ComponentLoadContext, component_folder: ComponentFolder
def build_components_from_decl_node(
context: ComponentLoadContext, decl_node: ComponentDeclNode
) -> Sequence[Component]:
to_return = []
for decl_node in component_folder.sub_decls:
if isinstance(decl_node, YamlComponentDecl):
parsed_defs = decl_node.defs_file_model
component_type = context.registry.get(parsed_defs.component_type)
to_return.append(component_type.from_decl_node(context, decl_node))
elif isinstance(decl_node, ComponentFolder):
to_return.extend(build_component_hierarchy(context, decl_node))
else:
raise NotImplementedError(f"Unknown component type {decl_node}")
return to_return
if isinstance(decl_node, YamlComponentDecl):
parsed_defs = decl_node.defs_file_model
component_type = context.registry.get(parsed_defs.component_type)
return [component_type.from_decl_node(context, decl_node)]
elif isinstance(decl_node, ComponentFolder):
components = []
for sub_decl in decl_node.sub_decls:
components.extend(build_components_from_decl_node(context, sub_decl))
return components

raise NotImplementedError(f"Unknown component type {decl_node}")


def build_components_from_component_folder(
context: ComponentLoadContext,
path: Path,
) -> Sequence[Component]:
component_folder = find_component_decl(path)
component_folder = path_to_decl_node(path)
assert isinstance(component_folder, ComponentFolder)
return build_component_hierarchy(context, component_folder)
return build_components_from_decl_node(context, component_folder)


def build_defs_from_component_folder(
def build_defs_from_component_path(
path: Path,
registry: ComponentRegistry,
resources: Mapping[str, object],
) -> "Definitions":
"""Build a definitions object from a folder within the components hierarchy."""
component_folder = find_component_decl(path=path)
assert isinstance(component_folder, ComponentFolder)
context = ComponentLoadContext(resources=resources, registry=registry)
components = build_components_from_component_folder(context=context, path=path)

decl_node = path_to_decl_node(path=path)
if not decl_node:
raise Exception(f"No component found at path {path}")
components = build_components_from_decl_node(context, decl_node)
return defs_from_components(resources=resources, context=context, components=components)


Expand Down Expand Up @@ -82,7 +89,7 @@ def build_defs_from_toplevel_components_folder(
all_defs: List[Definitions] = []
for component in context.component_instances:
component_path = Path(context.get_component_instance_path(component))
defs = build_defs_from_component_folder(
defs = build_defs_from_component_path(
path=component_path,
registry=context.component_registry,
resources=resources or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster_components.core.component_defs_builder import (
YamlComponentDecl,
build_components_from_component_folder,
build_defs_from_component_path,
defs_from_components,
)
from dagster_components.impls.pipes_subprocess_script_collection import (
Expand Down Expand Up @@ -81,3 +82,17 @@ def test_load_from_path() -> None:
AssetKey("up2"),
AssetKey("override_key"),
}


def test_load_from_location_path() -> None:
defs = build_defs_from_component_path(
LOCATION_PATH / "components" / "scripts", script_load_context().registry, {}
)
assert defs.get_asset_graph().get_all_asset_keys() == {
AssetKey("a"),
AssetKey("b"),
AssetKey("c"),
AssetKey("up1"),
AssetKey("up2"),
AssetKey("override_key"),
}

0 comments on commit ead436d

Please sign in to comment.