Skip to content

Commit

Permalink
[components] Add entry point mechanism for component registration (#2…
Browse files Browse the repository at this point in the history
…6337)

## Summary & Motivation

Change the `dagster-components` registration system to use Python entry
points. All components picked up by the system must be exposed as entry
points. This includes:

- components exposed by `dagster-components` itself
- components exposed by arbitrary third party libraries
- components exposed by the code location module

An entry point is declared under the group `dagster.components` in
`pyproject.toml`/`setup.py`:

```
[project.entry-points]
"dagster.components" = { lib = "my_pkg.lib" }
```

A new `ComponentRegistry.from_entry_point_discovery()` method is the
preferred way to construct a registry. It will pull all entry points
from this group using `importlib.metadata` and crawl the specified
modules.

## How I Tested These Changes

New unit tests.
  • Loading branch information
smackesey authored Dec 10, 2024
1 parent e8b9c2f commit c3a4549
Show file tree
Hide file tree
Showing 28 changed files with 209 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dagster as dg
from dagster_components import ComponentRegistry, build_defs_from_toplevel_components_folder
from dagster_components.impls.pipes_subprocess_script_collection import (
from dagster_components.lib.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_components.core.component import (
Component as Component,
ComponentLoadContext as ComponentLoadContext,
Expand All @@ -7,20 +9,6 @@
from dagster_components.core.component_defs_builder import (
build_defs_from_toplevel_components_folder as build_defs_from_toplevel_components_folder,
)
from dagster_components.impls.dbt_project import DbtProjectComponent
from dagster_components.impls.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
)
from dagster_components.impls.sling_replication import SlingReplicationComponent

__component_registry__ = {
"pipes_subprocess_script_collection": PipesSubprocessScriptCollection,
"sling_replication": SlingReplicationComponent,
"dbt_project": DbtProjectComponent,
}

from dagster._core.libraries import DagsterLibraryRegistry

from dagster_components.version import __version__ as __version__

DagsterLibraryRegistry.register("dagster-components", __version__)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click
from pydantic import TypeAdapter

from dagster_components import ComponentRegistry, __component_registry__
from dagster_components import ComponentRegistry
from dagster_components.core.deployment import (
CodeLocationProjectContext,
is_inside_code_location_project,
Expand Down Expand Up @@ -38,7 +38,7 @@ def generate_component_command(
sys.exit(1)

context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
Path.cwd(), ComponentRegistry.from_entry_point_discovery()
)
if not context.has_component_type(component_type):
click.echo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import click

from dagster_components import __component_registry__
from dagster_components.core.component import ComponentRegistry
from dagster_components.core.deployment import (
CodeLocationProjectContext,
Expand All @@ -30,7 +29,7 @@ def list_component_types_command() -> None:
sys.exit(1)

context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
Path.cwd(), ComponentRegistry.from_entry_point_discovery()
)
output: Dict[str, Any] = {}
for component_type in context.list_component_types():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import copy
import importlib
import importlib.metadata
import sys
from abc import ABC, abstractmethod
from types import ModuleType
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, Mapping, Optional, Type
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, Mapping, Optional, Sequence, Type

from dagster import _check as check
from dagster._core.errors import DagsterError
Expand Down Expand Up @@ -33,7 +36,33 @@ def from_decl_node(
) -> Self: ...


def get_entry_points_from_python_environment(group: str) -> Sequence[importlib.metadata.EntryPoint]:
if sys.version_info >= (3, 10):
return importlib.metadata.entry_points(group=group)
else:
return importlib.metadata.entry_points().get(group, [])


COMPONENTS_ENTRY_POINT_GROUP = "dagster.components"


class ComponentRegistry:
@classmethod
def from_entry_point_discovery(cls) -> "ComponentRegistry":
components: Dict[str, Type[Component]] = {}
for entry_point in get_entry_points_from_python_environment(COMPONENTS_ENTRY_POINT_GROUP):
root_module = entry_point.load()
if not isinstance(root_module, ModuleType):
raise DagsterError(
f"Invalid entry point {entry_point.name} in group {COMPONENTS_ENTRY_POINT_GROUP}. "
f"Value expected to be a module, got {root_module}."
)
for component in get_registered_components_in_module(root_module):
key = f"{entry_point.name}.{get_component_name(component)}"
components[key] = component

return cls(components)

def __init__(self, components: Dict[str, Type[Component]]):
self._components: Dict[str, Type[Component]] = copy.copy(components)

Expand All @@ -59,16 +88,16 @@ def __repr__(self) -> str:
return f"<ComponentRegistry {list(self._components.keys())}>"


def register_components_in_module(registry: ComponentRegistry, root_module: ModuleType) -> None:
def get_registered_components_in_module(root_module: ModuleType) -> Iterable[Type[Component]]:
from dagster._core.definitions.load_assets_from_modules import (
find_modules_in_package,
find_subclasses_in_module,
)

for module in find_modules_in_package(root_module):
for component in find_subclasses_in_module(module, (Component,)):
if is_component(component):
registry.register(get_component_name(component), component)
if is_registered_component(component):
yield component


class ComponentLoadContext:
Expand Down Expand Up @@ -114,13 +143,13 @@ def wrapper(actual_cls: Type[Component]) -> Type[Component]:
return cls


def is_component(cls: Type) -> bool:
def is_registered_component(cls: Type) -> bool:
return hasattr(cls, COMPONENT_REGISTRY_KEY_ATTR)


def get_component_name(component_type: Type[Component]) -> str:
check.param_invariant(
is_component(component_type),
is_registered_component(component_type),
"component_type",
"Expected a registered component. Use @component to register a component.",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ComponentLoadContext,
ComponentRegistry,
get_component_name,
is_component,
is_registered_component,
)
from dagster_components.core.component_decl_builder import (
ComponentFolder,
Expand Down Expand Up @@ -69,7 +69,10 @@ def component_type_from_yaml_decl(

for _name, obj in inspect.getmembers(module, inspect.isclass):
assert isinstance(obj, Type)
if is_component(obj) and get_component_name(obj) == component_registry_key:
if (
is_registered_component(obj)
and get_component_name(obj) == component_registry_key
):
return obj

raise Exception(
Expand Down Expand Up @@ -127,10 +130,8 @@ def build_defs_from_toplevel_components_folder(
"""Build a Definitions object from an entire component hierarchy."""
from dagster._core.definitions.definitions_class import Definitions

from dagster_components import __component_registry__

context = CodeLocationProjectContext.from_path(
path, registry or ComponentRegistry(__component_registry__)
path, registry or ComponentRegistry.from_entry_point_discovery()
)

all_defs: List[Definitions] = []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import importlib.util
import os
import sys
from pathlib import Path
from typing import Final, Iterable, Type

import tomli
from dagster._core.errors import DagsterError
from typing_extensions import Self

from dagster_components.core.component import (
Component,
ComponentRegistry,
register_components_in_module,
)
from dagster_components.core.component import Component, ComponentRegistry

# Code location
_CODE_LOCATION_CUSTOM_COMPONENTS_DIR: Final = "lib"
Expand Down Expand Up @@ -48,17 +42,6 @@ class CodeLocationProjectContext:
@classmethod
def from_path(cls, path: Path, component_registry: "ComponentRegistry") -> Self:
root_path = _resolve_code_location_root_path(path)
name = os.path.basename(root_path)

# TODO: Rm when a more robust solution is implemented
# Make sure we can import from the cwd
if sys.path[0] != "":
sys.path.insert(0, "")

components_lib_module = f"{name}.{_CODE_LOCATION_CUSTOM_COMPONENTS_DIR}"
module = importlib.import_module(components_lib_module)
register_components_in_module(component_registry, module)

return cls(
root_path=str(root_path),
name=os.path.basename(root_path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import BaseModel, TypeAdapter

from dagster_components.core.component import Component, ComponentDeclNode, ComponentLoadContext
from dagster_components.core.component import (
Component,
ComponentDeclNode,
ComponentLoadContext,
component,
)
from dagster_components.core.component_decl_builder import YamlComponentDecl

if TYPE_CHECKING:
Expand Down Expand Up @@ -48,6 +53,7 @@ class PipesSubprocessScriptCollectionParams(BaseModel):
scripts: Sequence[PipesSubprocessScriptParams]


@component(name="pipes_subprocess_script_collection")
class PipesSubprocessScriptCollection(Component):
params_schema = PipesSubprocessScriptCollectionParams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster_components import component
from dagster_components.impls.sling_replication import SlingReplicationComponent
from dagster_components.lib.sling_replication import SlingReplicationComponent
from dagster_embedded_elt.sling import SlingResource


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: pipes_subprocess_script_collection
type: dagster_components.pipes_subprocess_script_collection

params:
scripts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
build_components_from_component_folder,
defs_from_components,
)
from dagster_components.impls.dbt_project import DbtProjectComponent
from dagster_components.lib.dbt_project import DbtProjectComponent
from dagster_dbt import DbtProject

from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
YamlComponentDecl,
build_components_from_component_folder,
)
from dagster_components.impls.sling_replication import SlingReplicationComponent, component
from dagster_components.lib.sling_replication import SlingReplicationComponent, component
from dagster_embedded_elt.sling import SlingResource

from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
build_components_from_component_folder,
defs_from_components,
)
from dagster_components.impls.dbt_project import DbtProjectComponent
from dagster_components.lib.dbt_project import DbtProjectComponent
from dagster_dbt import DbtProject

from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dbt_project
type: dagster_components.dbt_project

params:
dbt:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: sling_replication
type: dagster_components.sling_replication

params:
sling:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dbt_project
type: dagster_components.dbt_project

params:
dbt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
build_defs_from_component_path,
defs_from_components,
)
from dagster_components.impls.pipes_subprocess_script_collection import (
from dagster_components.lib.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
from dagster_components import Component, component
from dagster_components.core.component import get_component_name, is_component
from dagster_components.core.component import get_component_name, is_registered_component


def test_registered_component_with_default_name() -> None:
@component
class RegisteredComponent(Component): ...

assert is_component(RegisteredComponent)
assert is_registered_component(RegisteredComponent)
assert get_component_name(RegisteredComponent) == "registered_component"


def test_registered_component_with_default_name_and_parens() -> None:
@component()
class RegisteredComponent(Component): ...

assert is_component(RegisteredComponent)
assert is_registered_component(RegisteredComponent)
assert get_component_name(RegisteredComponent) == "registered_component"


def test_registered_component_with_explicit_kwarg_name() -> None:
@component(name="explicit_name")
class RegisteredComponent(Component): ...

assert is_component(RegisteredComponent)
assert is_registered_component(RegisteredComponent)
assert get_component_name(RegisteredComponent) == "explicit_name"


def test_unregistered_component() -> None:
class UnregisteredComponent(Component): ...

assert not is_component(UnregisteredComponent)
assert not is_registered_component(UnregisteredComponent)
Loading

0 comments on commit c3a4549

Please sign in to comment.