diff --git a/python_modules/libraries/dagster-components/dagster_components/cli/__init__.py b/python_modules/libraries/dagster-components/dagster_components/cli/__init__.py index 025e67200f49c..bc123b3ddd2e0 100644 --- a/python_modules/libraries/dagster-components/dagster_components/cli/__init__.py +++ b/python_modules/libraries/dagster-components/dagster_components/cli/__init__.py @@ -3,6 +3,8 @@ from dagster_components.cli.generate import generate_cli from dagster_components.cli.list import list_cli +from dagster_components.core.component import BUILTIN_PUBLISHED_COMPONENT_ENTRY_POINT +from dagster_components.utils import CLI_BUILTIN_COMPONENT_LIB_KEY def create_dagster_components_cli(): @@ -15,9 +17,18 @@ def create_dagster_components_cli(): commands=commands, context_settings={"max_content_width": 120, "help_option_names": ["-h", "--help"]}, ) + @click.option( + "--builtin-component-lib", + type=str, + default=BUILTIN_PUBLISHED_COMPONENT_ENTRY_POINT, + help="Specify the builitin component library to load.", + ) @click.version_option(__version__, "--version", "-v") - def group(): + @click.pass_context + def group(ctx: click.Context, builtin_component_lib: str): """CLI tools for working with Dagster.""" + ctx.ensure_object(dict) + ctx.obj[CLI_BUILTIN_COMPONENT_LIB_KEY] = builtin_component_lib return group diff --git a/python_modules/libraries/dagster-components/dagster_components/cli/generate.py b/python_modules/libraries/dagster-components/dagster_components/cli/generate.py index 438d8ccb3b578..8405b5ec04e88 100644 --- a/python_modules/libraries/dagster-components/dagster_components/cli/generate.py +++ b/python_modules/libraries/dagster-components/dagster_components/cli/generate.py @@ -11,6 +11,7 @@ is_inside_code_location_project, ) from dagster_components.generate import generate_component_instance +from dagster_components.utils import CLI_BUILTIN_COMPONENT_LIB_KEY @click.group(name="generate") @@ -23,12 +24,15 @@ def generate_cli() -> None: @click.argument("component_name", type=str) @click.option("--json-params", type=str, default=None) @click.argument("extra_args", nargs=-1, type=str) +@click.pass_context def generate_component_command( + ctx: click.Context, component_type: str, component_name: str, json_params: Optional[str], extra_args: Tuple[str, ...], ) -> None: + builtin_component_lib = ctx.obj.get(CLI_BUILTIN_COMPONENT_LIB_KEY, False) if not is_inside_code_location_project(Path.cwd()): click.echo( click.style( @@ -38,7 +42,8 @@ def generate_component_command( sys.exit(1) context = CodeLocationProjectContext.from_path( - Path.cwd(), ComponentRegistry.from_entry_point_discovery() + Path.cwd(), + ComponentRegistry.from_entry_point_discovery(builtin_component_lib=builtin_component_lib), ) if not context.has_component_type(component_type): click.echo( diff --git a/python_modules/libraries/dagster-components/dagster_components/cli/list.py b/python_modules/libraries/dagster-components/dagster_components/cli/list.py index eb34c62ce9063..28957aaa33aad 100644 --- a/python_modules/libraries/dagster-components/dagster_components/cli/list.py +++ b/python_modules/libraries/dagster-components/dagster_components/cli/list.py @@ -10,6 +10,7 @@ CodeLocationProjectContext, is_inside_code_location_project, ) +from dagster_components.utils import CLI_BUILTIN_COMPONENT_LIB_KEY @click.group(name="generate") @@ -18,8 +19,10 @@ def list_cli(): @list_cli.command(name="component-types") -def list_component_types_command() -> None: +@click.pass_context +def list_component_types_command(ctx: click.Context) -> None: """List registered Dagster components.""" + builtin_component_lib = ctx.obj.get(CLI_BUILTIN_COMPONENT_LIB_KEY, False) if not is_inside_code_location_project(Path.cwd()): click.echo( click.style( @@ -29,7 +32,8 @@ def list_component_types_command() -> None: sys.exit(1) context = CodeLocationProjectContext.from_path( - Path.cwd(), ComponentRegistry.from_entry_point_discovery() + Path.cwd(), + ComponentRegistry.from_entry_point_discovery(builtin_component_lib=builtin_component_lib), ) output: Dict[str, Any] = {} for key, component_type in context.list_component_types(): diff --git a/python_modules/libraries/dagster-components/dagster_components/core/component.py b/python_modules/libraries/dagster-components/dagster_components/core/component.py index e7e53247d3dae..1c78031d600e0 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/component.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/component.py @@ -3,6 +3,7 @@ import importlib.metadata import inspect import sys +import textwrap from abc import ABC, abstractmethod from pathlib import Path from types import ModuleType @@ -25,6 +26,8 @@ from dagster._utils import snakecase from typing_extensions import Self +from dagster_components.utils import ensure_dagster_components_tests_import + if TYPE_CHECKING: from dagster._core.definitions.definitions_class import Definitions @@ -40,7 +43,7 @@ class ComponentGenerateRequest: class Component(ABC): name: ClassVar[Optional[str]] = None - component_params_schema: ClassVar = None + params_schema: ClassVar = None generate_params_schema: ClassVar = None @classmethod @@ -58,15 +61,15 @@ def from_decl_node( @classmethod def get_metadata(cls) -> "ComponentInternalMetadata": docstring = cls.__doc__ + clean_docstring = _clean_docstring(docstring) if docstring else None + return { - "summary": docstring.split("\n\n")[0] if docstring else None, - "description": docstring, + "summary": clean_docstring.split("\n\n")[0] if clean_docstring else None, + "description": clean_docstring if clean_docstring else None, "generate_params_schema": cls.generate_params_schema.schema() if cls.generate_params_schema else None, - "component_params_schema": cls.component_params_schema.schema() - if cls.component_params_schema - else None, + "component_params_schema": cls.params_schema.schema() if cls.params_schema else None, } @classmethod @@ -74,6 +77,16 @@ def get_description(cls) -> Optional[str]: return inspect.getdoc(cls) +def _clean_docstring(docstring: str) -> str: + lines = docstring.strip().splitlines() + first_line = lines[0] + if len(lines) == 1: + return first_line + else: + rest = textwrap.dedent("\n".join(lines[1:])) + return f"{first_line}\n{rest}" + + class ComponentInternalMetadata(TypedDict): summary: Optional[str] description: Optional[str] @@ -94,13 +107,43 @@ def get_entry_points_from_python_environment(group: str) -> Sequence[importlib.m COMPONENTS_ENTRY_POINT_GROUP = "dagster.components" +BUILTIN_COMPONENTS_ENTRY_POINT_BASE = "dagster_components" +BUILTIN_PUBLISHED_COMPONENT_ENTRY_POINT = BUILTIN_COMPONENTS_ENTRY_POINT_BASE +BUILTIN_TEST_COMPONENT_ENTRY_POINT = ".".join([BUILTIN_COMPONENTS_ENTRY_POINT_BASE, "test"]) class ComponentRegistry: @classmethod - def from_entry_point_discovery(cls) -> "ComponentRegistry": + def from_entry_point_discovery( + cls, builtin_component_lib: str = BUILTIN_PUBLISHED_COMPONENT_ENTRY_POINT + ) -> "ComponentRegistry": + """Discover components registered in the Python environment via the `dagster_components` entry point group. + + `dagster-components` itself registers multiple component entry points. We call these + "builtin" component libraries. The `dagster_components` entry point resolves to published + components and is loaded by default. Other entry points resolve to various sets of test + components. This method will only ever load one builtin component library. + + Args: + builtin-component-lib (str): Specifies the builtin components library to load. Builtin + copmonents libraries are defined under entry points with names matching the pattern + `dagster_components*`. Only one builtin component library can be loaded at a time. + Defaults to `dagster_components`, the standard set of published components. + """ components: Dict[str, Type[Component]] = {} for entry_point in get_entry_points_from_python_environment(COMPONENTS_ENTRY_POINT_GROUP): + # Skip builtin entry points that are not the specified builtin component library. + if ( + entry_point.name.startswith(BUILTIN_COMPONENTS_ENTRY_POINT_BASE) + and not entry_point.name == builtin_component_lib + ): + continue + elif entry_point.name == BUILTIN_TEST_COMPONENT_ENTRY_POINT: + if builtin_component_lib: + ensure_dagster_components_tests_import() + else: + continue + root_module = entry_point.load() if not isinstance(root_module, ModuleType): raise DagsterError( diff --git a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py index bacc8edb36dba..017fb8aa90dc0 100644 --- a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py @@ -77,21 +77,21 @@ def introspect_from_path(path: Path) -> "PipesSubprocessScriptCollection": @classmethod def from_decl_node( - cls, load_context: ComponentLoadContext, component_decl: ComponentDeclNode + cls, load_context: ComponentLoadContext, decl_node: ComponentDeclNode ) -> "PipesSubprocessScriptCollection": - assert isinstance(component_decl, YamlComponentDecl) + assert isinstance(decl_node, YamlComponentDecl) loaded_params = TypeAdapter(cls.params_schema).validate_python( - component_decl.component_file_model.params + decl_node.component_file_model.params ) path_specs = {} for script in loaded_params.scripts: - script_path = component_decl.path / script.path + script_path = decl_node.path / script.path if not script_path.exists(): raise FileNotFoundError(f"Script {script_path} does not exist") path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets] - return cls(dirpath=component_decl.path, path_specs=path_specs) + return cls(dirpath=decl_node.path, path_specs=path_specs) def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions": from dagster._core.definitions.definitions_class import Definitions diff --git a/python_modules/libraries/dagster-components/dagster_components/utils.py b/python_modules/libraries/dagster-components/dagster_components/utils.py new file mode 100644 index 0000000000000..5d4f19c11f694 --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components/utils.py @@ -0,0 +1,14 @@ +import sys +from pathlib import Path + +CLI_BUILTIN_COMPONENT_LIB_KEY = "builtin_component_lib" + + +def ensure_dagster_components_tests_import() -> None: + from dagster_components import __file__ as dagster_components_init_py + + dagster_components_package_root = (Path(dagster_components_init_py) / ".." / "..").resolve() + assert ( + dagster_components_package_root / "dagster_components_tests" + ).exists(), "Could not find dagster_components_tests where expected" + sys.path.append(dagster_components_package_root.as_posix()) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/cli_tests/test_commands.py b/python_modules/libraries/dagster-components/dagster_components_tests/cli_tests/test_commands.py new file mode 100644 index 0000000000000..1f4fad6c8c99c --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/cli_tests/test_commands.py @@ -0,0 +1,103 @@ +import json +from pathlib import Path + +from click.testing import CliRunner +from dagster_components.cli import cli +from dagster_components.utils import ensure_dagster_components_tests_import + +ensure_dagster_components_tests_import() + +from dagster_components_tests.utils import temp_code_location_bar + + +# Test that the global --use-test-component-lib flag changes the registered components +def test_global_test_flag(): + runner: CliRunner = CliRunner() + + # standard + result = runner.invoke(cli, ["list", "component-types"]) + assert result.exit_code == 0 + default_result_keys = list(json.loads(result.output).keys()) + assert len(default_result_keys) > 0 + + result = runner.invoke( + cli, ["--builtin-component-lib", "dagster_components.test", "list", "component-types"] + ) + assert result.exit_code == 0 + test_result_keys = list(json.loads(result.output).keys()) + assert len(default_result_keys) > 0 + + assert default_result_keys != test_result_keys + + +def test_list_component_types_command(): + runner = CliRunner() + + result = runner.invoke( + cli, ["--builtin-component-lib", "dagster_components.test", "list", "component-types"] + ) + assert result.exit_code == 0 + result = json.loads(result.output) + + assert list(result.keys()) == [ + "dagster_components.test.all_metadata_empty_asset", + "dagster_components.test.simple_asset", + "dagster_components.test.simple_pipes_script_asset", + ] + + assert result["dagster_components.test.simple_asset"] == { + "name": "simple_asset", + "package": "dagster_components.test", + "summary": "A simple asset that returns a constant string value.", + "description": "A simple asset that returns a constant string value.", + "generate_params_schema": None, + "component_params_schema": { + "properties": { + "asset_key": {"title": "Asset Key", "type": "string"}, + "value": {"title": "Value", "type": "string"}, + }, + "required": ["asset_key", "value"], + "title": "SimpleAssetParams", + "type": "object", + }, + } + + pipes_script_params_schema = { + "properties": { + "asset_key": {"title": "Asset Key", "type": "string"}, + "filename": {"title": "Filename", "type": "string"}, + }, + "required": ["asset_key", "filename"], + "title": "SimplePipesScriptAssetParams", + "type": "object", + } + + assert result["dagster_components.test.simple_pipes_script_asset"] == { + "name": "simple_pipes_script_asset", + "package": "dagster_components.test", + "summary": "A simple asset that runs a Python script with the Pipes subprocess client.", + "description": "A simple asset that runs a Python script with the Pipes subprocess client.\n\nBecause it is a pipes asset, no value is returned.", + "generate_params_schema": pipes_script_params_schema, + "component_params_schema": pipes_script_params_schema, + } + + +def test_generate_component_command(): + runner = CliRunner() + + with temp_code_location_bar(): + result = runner.invoke( + cli, + [ + "--builtin-component-lib", + "dagster_components.test", + "generate", + "component", + "dagster_components.test.simple_pipes_script_asset", + "qux", + "--json-params", + '{"asset_key": "my_asset", "filename": "my_asset.py"}', + ], + ) + assert result.exit_code == 0 + assert Path("bar/components/qux/my_asset.py").exists() diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/lib/__init__.py b/python_modules/libraries/dagster-components/dagster_components_tests/lib/__init__.py new file mode 100644 index 0000000000000..f988bb8df7cae --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/lib/__init__.py @@ -0,0 +1,7 @@ +from dagster_components_tests.lib.all_metadata_empty_asset import ( + AllMetadataEmptyAsset as AllMetadataEmptyAsset, +) +from dagster_components_tests.lib.simple_asset import SimpleAsset as SimpleAsset +from dagster_components_tests.lib.simple_pipes_script_asset import ( + SimplePipesScriptAsset as SimplePipesScriptAsset, +) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/lib/all_metadata_empty_asset.py b/python_modules/libraries/dagster-components/dagster_components_tests/lib/all_metadata_empty_asset.py new file mode 100644 index 0000000000000..339f5761d21b8 --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/lib/all_metadata_empty_asset.py @@ -0,0 +1,34 @@ +from typing import TYPE_CHECKING, Any + +from dagster._core.definitions.decorators.asset_decorator import asset +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext +from dagster_components import Component, ComponentLoadContext, component +from dagster_components.core.component import ComponentGenerateRequest +from dagster_components.core.component_decl_builder import YamlComponentDecl +from dagster_components.generate import generate_component_yaml +from typing_extensions import Self + +if TYPE_CHECKING: + from dagster_components.core.component import ComponentDeclNode + + +@component(name="all_metadata_empty_asset") +class AllMetadataEmptyAsset(Component): + @classmethod + def from_decl_node( + cls, context: "ComponentLoadContext", decl_node: "ComponentDeclNode" + ) -> Self: + assert isinstance(decl_node, YamlComponentDecl) + return cls() + + @classmethod + def generate_files(cls, request: ComponentGenerateRequest, params: Any) -> None: + generate_component_yaml(request, params) + + def build_defs(self, context: ComponentLoadContext) -> Definitions: + @asset + def hardcoded_asset(context: AssetExecutionContext): + return 1 + + return Definitions(assets=[hardcoded_asset]) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_asset.py b/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_asset.py new file mode 100644 index 0000000000000..67f742064ee76 --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_asset.py @@ -0,0 +1,55 @@ +from typing import TYPE_CHECKING + +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.decorators.asset_decorator import asset +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext +from dagster_components import Component, ComponentLoadContext, component +from dagster_components.core.component import ComponentGenerateRequest +from dagster_components.core.component_decl_builder import YamlComponentDecl +from dagster_components.generate import generate_component_yaml +from pydantic import BaseModel, TypeAdapter +from typing_extensions import Self + +if TYPE_CHECKING: + from dagster_components.core.component import ComponentDeclNode + + +class SimpleAssetParams(BaseModel): + asset_key: str + value: str + + +@component(name="simple_asset") +class SimpleAsset(Component): + """A simple asset that returns a constant string value.""" + + params_schema = SimpleAssetParams + + @classmethod + def generate_files(cls, request: ComponentGenerateRequest, params: SimpleAssetParams) -> None: + generate_component_yaml(request, params.model_dump()) + + @classmethod + def from_decl_node( + cls, context: "ComponentLoadContext", decl_node: "ComponentDeclNode" + ) -> Self: + assert isinstance(decl_node, YamlComponentDecl) + loaded_params = TypeAdapter(cls.params_schema).validate_python( + decl_node.component_file_model.params + ) + return cls( + asset_key=AssetKey.from_user_string(loaded_params.asset_key), + value=loaded_params.value, + ) + + def __init__(self, asset_key: AssetKey, value: str): + self._asset_key = asset_key + self._value = value + + def build_defs(self, context: ComponentLoadContext) -> Definitions: + @asset(key=self._asset_key) + def dummy(context: AssetExecutionContext): + return self._value + + return Definitions(assets=[dummy]) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_pipes_script_asset.py b/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_pipes_script_asset.py new file mode 100644 index 0000000000000..a25b6b213d840 --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/lib/simple_pipes_script_asset.py @@ -0,0 +1,87 @@ +import shutil +from pathlib import Path +from typing import TYPE_CHECKING + +import click +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.decorators.asset_decorator import asset +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext +from dagster._core.pipes.subprocess import PipesSubprocessClient +from dagster_components import Component, ComponentLoadContext, component +from dagster_components.core.component import ComponentGenerateRequest +from dagster_components.core.component_decl_builder import YamlComponentDecl +from dagster_components.generate import generate_component_yaml +from pydantic import BaseModel, TypeAdapter +from typing_extensions import Self + +if TYPE_CHECKING: + from dagster_components.core.component import ComponentDeclNode + + +# Same schema used for file generation and defs generation +class SimplePipesScriptAssetParams(BaseModel): + asset_key: str + filename: str + + @staticmethod + @click.command + @click.option("--asset-key", type=str) + @click.option("--filename", type=str) + def cli(asset_key: str, filename: str) -> "SimplePipesScriptAssetParams": + return SimplePipesScriptAssetParams(asset_key=asset_key, filename=filename) + + +_SCRIPT_TEMPLATE = """ +from dagster_pipes import open_dagster_pipes + +context = open_dagster_pipes() + +context.log.info("Materializing asset {asset_key} from pipes") +context.report_asset_materialization(asset_key="{asset_key}") +""" + + +@component(name="simple_pipes_script_asset") +class SimplePipesScriptAsset(Component): + """A simple asset that runs a Python script with the Pipes subprocess client. + + Because it is a pipes asset, no value is returned. + """ + + generate_params_schema = SimplePipesScriptAssetParams + params_schema = SimplePipesScriptAssetParams + + @classmethod + def generate_files( + cls, request: ComponentGenerateRequest, params: SimplePipesScriptAssetParams + ) -> None: + generate_component_yaml(request, params.model_dump()) + Path(request.component_instance_root_path, params.filename).write_text( + _SCRIPT_TEMPLATE.format(asset_key=params.asset_key) + ) + + @classmethod + def from_decl_node( + cls, context: "ComponentLoadContext", decl_node: "ComponentDeclNode" + ) -> Self: + assert isinstance(decl_node, YamlComponentDecl) + loaded_params = TypeAdapter(cls.params_schema).validate_python( + decl_node.component_file_model.params + ) + return cls( + asset_key=AssetKey.from_user_string(loaded_params.asset_key), + script_path=decl_node.path / loaded_params.filename, + ) + + def __init__(self, asset_key: AssetKey, script_path: Path): + self._asset_key = asset_key + self._script_path = script_path + + def build_defs(self, context: ComponentLoadContext) -> Definitions: + @asset(key=self._asset_key) + def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient): + cmd = [shutil.which("python"), self._script_path] + return pipes_client.run(command=cmd, context=context).get_results() + + return Definitions(assets=[_asset]) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py index 65e61a54b312c..73955faa3291d 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py @@ -27,7 +27,7 @@ def test_python_native() -> None: def test_python_params() -> None: component = PipesSubprocessScriptCollection.from_decl_node( load_context=script_load_context(), - component_decl=YamlComponentDecl( + decl_node=YamlComponentDecl( path=LOCATION_PATH / "components" / "scripts", component_file_model=ComponentFileModel( type="pipes_subprocess_script_collection", diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/utils.py b/python_modules/libraries/dagster-components/dagster_components_tests/utils.py index 51bce9a14ea32..665aea9ea6e54 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/utils.py +++ b/python_modules/libraries/dagster-components/dagster_components_tests/utils.py @@ -1,4 +1,11 @@ +import textwrap +from contextlib import contextmanager +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import AbstractSet, Iterator + from dagster import AssetKey, DagsterInstance +from dagster._utils import pushd from dagster_components.core.component import Component, ComponentLoadContext, ComponentRegistry @@ -10,7 +17,7 @@ def script_load_context() -> ComponentLoadContext: return ComponentLoadContext(registry=registry(), resources={}) -def get_asset_keys(component: Component) -> set[AssetKey]: +def get_asset_keys(component: Component) -> AbstractSet[AssetKey]: return { key for key in component.build_defs(ComponentLoadContext.for_test()) @@ -26,3 +33,45 @@ def assert_assets(component: Component, expected_assets: int) -> None: instance=DagsterInstance.ephemeral() ) assert result.success + + +def generate_component_lib_pyproject_toml(name: str, is_code_location: bool = False) -> str: + pkg_name = name.replace("-", "_") + base = textwrap.dedent(f""" + [build-system] + requires = ["setuptools", "wheel"] + build-backend = "setuptools.build_meta" + + [project] + name = "{name}" + version = "0.1.0" + dependencies = [ + "dagster-components", + ] + + [project.entry-points] + "dagster.components" = {{ {pkg_name} = "{pkg_name}.lib"}} + """) + if is_code_location: + return base + textwrap.dedent(""" + [tool.dagster] + module_name = "{ pkg_name }.definitions" + project_name = "{ pkg_name }" + """) + else: + return base + + +@contextmanager +def temp_code_location_bar() -> Iterator[None]: + with TemporaryDirectory() as tmpdir, pushd(tmpdir): + Path("bar/bar/lib").mkdir(parents=True) + Path("bar/bar/components").mkdir(parents=True) + with open("bar/pyproject.toml", "w") as f: + f.write(generate_component_lib_pyproject_toml("bar", is_code_location=True)) + Path("bar/bar/__init__.py").touch() + Path("bar/bar/definitions.py").touch() + Path("bar/bar/lib/__init__.py").touch() + + with pushd("bar"): + yield diff --git a/python_modules/libraries/dagster-components/setup.py b/python_modules/libraries/dagster-components/setup.py index 0c009b3e36907..1c89e6f5f31f4 100644 --- a/python_modules/libraries/dagster-components/setup.py +++ b/python_modules/libraries/dagster-components/setup.py @@ -46,6 +46,7 @@ def get_version() -> str: ], "dagster.components": [ "dagster_components = dagster_components.lib", + "dagster_components.test = dagster_components_tests.lib", ], }, extras_require={ diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py index 38ac45544ae77..fd96f972ad707 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py @@ -3,6 +3,7 @@ from dagster_dg.cli.generate import generate_cli from dagster_dg.cli.info import info_cli from dagster_dg.cli.list import list_cli +from dagster_dg.utils import CLI_BUILTIN_COMPONENT_LIB_KEY, DEFAULT_BUILTIN_COMPONENT_LIB from dagster_dg.version import __version__ @@ -18,8 +19,17 @@ def create_dg_cli(): context_settings={"max_content_width": 120, "help_option_names": ["-h", "--help"]}, ) @click.version_option(__version__, "--version", "-v") - def group(): + @click.option( + "--builtin-component-lib", + type=str, + default=DEFAULT_BUILTIN_COMPONENT_LIB, + help="Specify a builitin component library to use.", + ) + @click.pass_context + def group(context: click.Context, builtin_component_lib: bool): """CLI tools for working with Dagster components.""" + context.ensure_object(dict) + context.obj[CLI_BUILTIN_COMPONENT_LIB_KEY] = builtin_component_lib return group diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/generate.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/generate.py index 83d0ffd61f61d..9f94ea945f4ca 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/generate.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/generate.py @@ -17,6 +17,7 @@ generate_component_type, generate_deployment, ) +from dagster_dg.utils import CLI_BUILTIN_COMPONENT_LIB_KEY @click.group(name="generate") @@ -139,7 +140,9 @@ def generate_component_type_command(name: str) -> None: @click.argument("component_name", type=str) @click.option("--json-params", type=str, default=None, help="JSON string of component parameters.") @click.argument("extra_args", nargs=-1, type=str) +@click.pass_context def generate_component_command( + context: click.Context, component_type: str, component_name: str, json_params: Optional[str], @@ -169,6 +172,7 @@ def generate_component_command( It is an error to pass both --json-params and EXTRA_ARGS. """ + builtin_component_lib = context.obj.get(CLI_BUILTIN_COMPONENT_LIB_KEY, False) if not is_inside_code_location_directory(Path.cwd()): click.echo( click.style( @@ -177,13 +181,13 @@ def generate_component_command( ) sys.exit(1) - context = CodeLocationDirectoryContext.from_path(Path.cwd()) - if not context.has_component_type(component_type): + dg_context = CodeLocationDirectoryContext.from_path(Path.cwd(), builtin_component_lib) + if not dg_context.has_component_type(component_type): click.echo( click.style(f"No component type `{component_type}` could be resolved.", fg="red") ) sys.exit(1) - elif context.has_component_instance(component_name): + elif dg_context.has_component_instance(component_name): click.echo( click.style(f"A component instance named `{component_name}` already exists.", fg="red") ) @@ -200,9 +204,10 @@ def generate_component_command( sys.exit(1) generate_component_instance( - Path(context.component_instances_root_path), + Path(dg_context.component_instances_root_path), component_name, component_type, json_params, extra_args, + builtin_component_lib=builtin_component_lib, ) diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/info.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/info.py index 8cc094a0a34c3..28774eba6c826 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/info.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/info.py @@ -6,6 +6,7 @@ import click from dagster_dg.context import CodeLocationDirectoryContext, is_inside_code_location_directory +from dagster_dg.utils import CLI_BUILTIN_COMPONENT_LIB_KEY @click.group(name="info") @@ -22,13 +23,16 @@ def _serialize_json_schema(schema: Mapping[str, Any]) -> str: @click.option("--description", is_flag=True, default=False) @click.option("--generate-params-schema", is_flag=True, default=False) @click.option("--component-params-schema", is_flag=True, default=False) +@click.pass_context def info_component_type_command( + context: click.Context, component_type: str, description: bool, generate_params_schema: bool, component_params_schema: bool, ) -> None: """Get detailed information on a registered Dagster component type.""" + builtin_component_lib = context.obj.get(CLI_BUILTIN_COMPONENT_LIB_KEY, False) if not is_inside_code_location_directory(Path.cwd()): click.echo( click.style( @@ -37,8 +41,10 @@ def info_component_type_command( ) sys.exit(1) - context = CodeLocationDirectoryContext.from_path(Path.cwd()) - if not context.has_component_type(component_type): + dg_context = CodeLocationDirectoryContext.from_path( + Path.cwd(), builtin_component_lib=builtin_component_lib + ) + if not dg_context.has_component_type(component_type): click.echo( click.style(f"No component type `{component_type}` could be resolved.", fg="red") ) @@ -53,8 +59,7 @@ def info_component_type_command( ) sys.exit(1) - context = CodeLocationDirectoryContext.from_path(Path.cwd()) - component_type_metadata = context.get_component_type(component_type) + component_type_metadata = dg_context.get_component_type(component_type) if description: if component_type_metadata.description: @@ -63,14 +68,15 @@ def info_component_type_command( click.echo("No description available.") elif generate_params_schema: if component_type_metadata.generate_params_schema: - click.echo(component_type_metadata.generate_params_schema) + click.echo(_serialize_json_schema(component_type_metadata.generate_params_schema)) else: click.echo("No generate params schema defined.") elif component_params_schema: if component_type_metadata.component_params_schema: - click.echo(component_type_metadata.component_params_schema) + click.echo(_serialize_json_schema(component_type_metadata.component_params_schema)) else: click.echo("No component params schema defined.") + # print all available metadata else: click.echo(component_type) diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/list.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/list.py index e8ad870233509..24e0f3f10d093 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/list.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/list.py @@ -9,6 +9,7 @@ is_inside_code_location_directory, is_inside_deployment_directory, ) +from dagster_dg.utils import CLI_BUILTIN_COMPONENT_LIB_KEY @click.group(name="list") @@ -31,8 +32,10 @@ def list_code_locations_command() -> None: @list_cli.command(name="component-types") -def list_component_types_command() -> None: +@click.pass_context +def list_component_types_command(context: click.Context) -> None: """List registered Dagster components in the current code location environment.""" + builtin_component_lib = context.obj.get(CLI_BUILTIN_COMPONENT_LIB_KEY, False) if not is_inside_code_location_directory(Path.cwd()): click.echo( click.style( @@ -41,8 +44,10 @@ def list_component_types_command() -> None: ) sys.exit(1) - context = CodeLocationDirectoryContext.from_path(Path.cwd()) - for key, component_type in context.iter_component_types(): + dg_context = CodeLocationDirectoryContext.from_path( + Path.cwd(), builtin_component_lib=builtin_component_lib + ) + for key, component_type in dg_context.iter_component_types(): click.echo(key) if component_type.summary: click.echo(f" {component_type.summary}") diff --git a/python_modules/libraries/dagster-dg/dagster_dg/context.py b/python_modules/libraries/dagster-dg/dagster_dg/context.py index 2c105ae0a0efc..fbd0186ad7dfd 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/context.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/context.py @@ -1,14 +1,14 @@ import json import os from pathlib import Path -from typing import Final, Iterable, Mapping, Optional, Tuple +from typing import Final, Iterable, Optional, Tuple import tomli from typing_extensions import Self from dagster_dg.component import RemoteComponentRegistry, RemoteComponentType from dagster_dg.error import DgError -from dagster_dg.utils import execute_code_location_command +from dagster_dg.utils import DEFAULT_BUILTIN_COMPONENT_LIB, execute_code_location_command def is_inside_deployment_directory(path: Path) -> bool: @@ -89,13 +89,23 @@ def get_code_location_names(self) -> Iterable[str]: class CodeLocationDirectoryContext: - _components_registry: Mapping[str, RemoteComponentType] = {} + """Class encapsulating contextual information about a components code location directory. + + Args: + root_path (Path): The absolute path to the root of the code location directory. + name (str): The name of the code location python package. + component_registry (ComponentRegistry): The component registry for the code location. + deployment_context (Optional[DeploymentDirectoryContext]): The deployment context containing + the code location directory. Defaults to None. + """ @classmethod - def from_path(cls, path: Path) -> Self: + def from_path( + cls, path: Path, builtin_component_lib: str = DEFAULT_BUILTIN_COMPONENT_LIB + ) -> Self: root_path = _resolve_code_location_root_directory(path) component_registry_data = execute_code_location_command( - root_path, ["list", "component-types"] + root_path, ["list", "component-types"], builtin_component_lib=builtin_component_lib ) component_registry = RemoteComponentRegistry.from_dict(json.loads(component_registry_data)) diff --git a/python_modules/libraries/dagster-dg/dagster_dg/generate.py b/python_modules/libraries/dagster-dg/dagster_dg/generate.py index e712f1c2b7984..c00901332c158 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/generate.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/generate.py @@ -8,6 +8,7 @@ from dagster_dg.context import CodeLocationDirectoryContext from dagster_dg.utils import ( + DEFAULT_BUILTIN_COMPONENT_LIB, camelcase, execute_code_location_command, generate_subtree, @@ -150,6 +151,7 @@ def generate_component_instance( component_type: str, json_params: Optional[str], extra_args: Tuple[str, ...], + builtin_component_lib: str = DEFAULT_BUILTIN_COMPONENT_LIB, ) -> None: component_instance_root_path = root_path / name click.echo(f"Creating a Dagster component instance folder at {component_instance_root_path}.") @@ -162,4 +164,8 @@ def generate_component_instance( *(["--json-params", json_params] if json_params else []), *(["--", *extra_args] if extra_args else []), ) - execute_code_location_command(Path(component_instance_root_path), code_location_command) + execute_code_location_command( + Path(component_instance_root_path), + code_location_command, + builtin_component_lib=builtin_component_lib, + ) diff --git a/python_modules/libraries/dagster-dg/dagster_dg/utils.py b/python_modules/libraries/dagster-dg/dagster_dg/utils.py index a3999f10447a9..b0d2c917ff7d3 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/utils.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/utils.py @@ -12,12 +12,22 @@ from dagster_dg.version import __version__ as dagster_version +CLI_BUILTIN_COMPONENT_LIB_KEY = "builtin_component_lib" +DEFAULT_BUILTIN_COMPONENT_LIB: Final = "dagster_components" + + _CODE_LOCATION_COMMAND_PREFIX: Final = ["uv", "run", "dagster-components"] -def execute_code_location_command(path: Path, cmd: Sequence[str]) -> str: +def execute_code_location_command( + path: Path, cmd: Sequence[str], builtin_component_lib: Optional[str] = None +) -> str: + full_cmd = [ + *_CODE_LOCATION_COMMAND_PREFIX, + *(["--builtin-component-lib", builtin_component_lib] if builtin_component_lib else []), + *cmd, + ] with pushd(path): - full_cmd = [*_CODE_LOCATION_COMMAND_PREFIX, *cmd] result = subprocess.run( full_cmd, stdout=subprocess.PIPE, env=get_uv_command_env(), check=True ) diff --git a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_generate_commands.py b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_generate_commands.py index 09838048dddce..4158671adf31e 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_generate_commands.py +++ b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_generate_commands.py @@ -1,135 +1,30 @@ -import inspect import json import os import subprocess -import sys -import textwrap -from contextlib import contextmanager from pathlib import Path -from typing import Iterator import pytest import tomli from click.testing import CliRunner -from dagster_dg.cli.generate import ( - generate_code_location_command, - generate_component_command, - generate_component_type_command, - generate_deployment_command, -) +from dagster_dg.cli import cli as dg_cli from dagster_dg.context import CodeLocationDirectoryContext -from dagster_dg.utils import discover_git_root, pushd - - -# This is a holder for code that is intended to be written to a file -def _example_component_type_baz(): - import click - from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset - from dagster_components import ( - Component, - ComponentGenerateRequest, - ComponentLoadContext, - component, - ) - from dagster_components.generate import generate_component_yaml - from pydantic import BaseModel - - _SAMPLE_PIPES_SCRIPT = """ - from dagster_pipes import open_dagster_pipes - - context = open_dagster_pipes() - context.report_asset_materialization({"alpha": "beta"}) - """ - - class BazGenerateParams(BaseModel): - filename: str = "sample.py" - - @staticmethod - @click.command - @click.option("--filename", type=str, default="sample.py") - def cli(filename: str) -> "BazGenerateParams": - return BazGenerateParams(filename=filename) - - @component(name="baz") - class Baz(Component): - generate_params_schema = BazGenerateParams - - @classmethod - def generate_files(cls, request: ComponentGenerateRequest, params: BazGenerateParams): - generate_component_yaml(request, {}) - with open(params.filename, "w") as f: - f.write(_SAMPLE_PIPES_SCRIPT) - - def build_defs(self, context: ComponentLoadContext) -> Definitions: - @asset - def foo(context: AssetExecutionContext, client: PipesSubprocessClient): - client.run(context=context, command=["python", "sample.py"]) - - return Definitions(assets=[foo], resources={"client": PipesSubprocessClient()}) - - -@contextmanager -def isolated_example_deployment_foo(runner: CliRunner) -> Iterator[None]: - with runner.isolated_filesystem(): - runner.invoke(generate_deployment_command, ["foo"]) - with pushd("foo"): - yield - - -@contextmanager -def isolated_example_code_location_bar( - runner: CliRunner, in_deployment: bool = True -) -> Iterator[None]: - dagster_git_repo_dir = str(discover_git_root(Path(__file__))) - if in_deployment: - with isolated_example_deployment_foo(runner), clean_module_cache("bar"): - runner.invoke( - generate_code_location_command, - ["--use-editable-dagster", dagster_git_repo_dir, "bar"], - ) - with pushd("code_locations/bar"): - yield - else: - with runner.isolated_filesystem(), clean_module_cache("bar"): - runner.invoke( - generate_code_location_command, - ["--use-editable-dagster", dagster_git_repo_dir, "bar"], - ) - with pushd("bar"): - yield - - -@contextmanager -def isolated_example_code_location_bar_with_component_type_baz( - runner: CliRunner, in_deployment: bool = True -) -> Iterator[None]: - with isolated_example_code_location_bar(runner, in_deployment): - with open("bar/lib/__init__.py", "a") as f: - f.write("from bar.lib.baz import Baz\n") - with open("bar/lib/baz.py", "w") as f: - component_type_source = textwrap.dedent( - inspect.getsource(_example_component_type_baz).split("\n", 1)[1] - ) - f.write(component_type_source) - yield - - -@contextmanager -def clean_module_cache(module_name: str): - prefix = f"{module_name}." - keys_to_del = { - key for key in sys.modules.keys() if key == module_name or key.startswith(prefix) - } - for key in keys_to_del: - del sys.modules[key] - yield +from dagster_dg.utils import discover_git_root, ensure_dagster_dg_tests_import + +ensure_dagster_dg_tests_import() + +from dagster_dg_tests.utils import ( + ProxyRunner, + assert_runner_result, + isolated_example_code_location_bar, + isolated_example_deployment_foo, +) def test_generate_deployment_command_success() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): - result = runner.invoke(generate_deployment_command, ["foo"]) - assert result.exit_code == 0 + result = runner.invoke("generate", "deployment", "foo") + assert_runner_result(result) assert Path("foo").exists() assert Path("foo/.github").exists() assert Path("foo/.github/workflows").exists() @@ -139,19 +34,19 @@ def test_generate_deployment_command_success() -> None: def test_generate_deployment_command_already_exists_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): os.mkdir("foo") - result = runner.invoke(generate_deployment_command, ["foo"]) - assert result.exit_code != 0 + result = runner.invoke("generate", "deployment", "foo") + assert_runner_result(result, exit_0=False) assert "already exists" in result.output def test_generate_code_location_inside_deployment_success() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_deployment_foo(runner): - result = runner.invoke(generate_code_location_command, ["bar"]) - assert result.exit_code == 0 + result = runner.invoke("generate", "code-location", "bar") + assert_runner_result(result) assert Path("code_locations/bar").exists() assert Path("code_locations/bar/bar").exists() assert Path("code_locations/bar/bar/lib").exists() @@ -171,10 +66,10 @@ def test_generate_code_location_inside_deployment_success() -> None: def test_generate_code_location_outside_deployment_success() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): - result = runner.invoke(generate_code_location_command, ["bar"]) - assert result.exit_code == 0 + result = runner.invoke("generate", "code-location", "bar") + assert_runner_result(result) assert Path("bar").exists() assert Path("bar/bar").exists() assert Path("bar/bar/lib").exists() @@ -189,7 +84,7 @@ def test_generate_code_location_outside_deployment_success() -> None: @pytest.mark.parametrize("mode", ["env_var", "arg"]) def test_generate_code_location_editable_dagster_success(mode: str, monkeypatch) -> None: - runner = CliRunner() + runner = ProxyRunner.test() dagster_git_repo_dir = discover_git_root(Path(__file__)) if mode == "env_var": monkeypatch.setenv("DAGSTER_GIT_REPO_DIR", str(dagster_git_repo_dir)) @@ -197,8 +92,8 @@ def test_generate_code_location_editable_dagster_success(mode: str, monkeypatch) else: editable_args = ["--use-editable-dagster", str(dagster_git_repo_dir)] with isolated_example_deployment_foo(runner): - result = runner.invoke(generate_code_location_command, [*editable_args, "bar"]) - assert result.exit_code == 0 + result = runner.invoke("generate", "code-location", *editable_args, "bar") + assert_runner_result(result) assert Path("code_locations/bar").exists() assert Path("code_locations/bar/pyproject.toml").exists() with open("code_locations/bar/pyproject.toml") as f: @@ -222,137 +117,172 @@ def test_generate_code_location_editable_dagster_success(mode: str, monkeypatch) def test_generate_code_location_editable_dagster_no_env_var_no_value_fails(monkeypatch) -> None: - runner = CliRunner() + runner = ProxyRunner.test() monkeypatch.setenv("DAGSTER_GIT_REPO_DIR", "") with isolated_example_deployment_foo(runner): - result = runner.invoke( - generate_code_location_command, ["--use-editable-dagster", "--", "bar"] - ) - assert result.exit_code != 0 + result = runner.invoke("generate", "code-location", "--use-editable-dagster", "--", "bar") + assert_runner_result(result, exit_0=False) assert "requires the `DAGSTER_GIT_REPO_DIR`" in result.output def test_generate_code_location_already_exists_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_deployment_foo(runner): - result = runner.invoke(generate_code_location_command, ["bar"]) - assert result.exit_code == 0 - result = runner.invoke(generate_code_location_command, ["bar"]) - assert result.exit_code != 0 + result = runner.invoke("generate", "code-location", "bar") + assert_runner_result(result) + result = runner.invoke("generate", "code-location", "bar") + assert_runner_result(result, exit_0=False) assert "already exists" in result.output @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_type_success(in_deployment: bool) -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner, in_deployment): - result = runner.invoke(generate_component_type_command, ["baz"]) - assert result.exit_code == 0 + result = runner.invoke("generate", "component-type", "baz") + assert_runner_result(result) assert Path("bar/lib/baz.py").exists() context = CodeLocationDirectoryContext.from_path(Path.cwd()) assert context.has_component_type("bar.baz") def test_generate_component_type_outside_code_location_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_deployment_foo(runner): - result = runner.invoke(generate_component_type_command, ["baz"]) - assert result.exit_code != 0 + result = runner.invoke("generate", "component-type", "baz") + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster code location directory" in result.output @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_type_already_exists_fails(in_deployment: bool) -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner, in_deployment): - result = runner.invoke(generate_component_type_command, ["baz"]) - assert result.exit_code == 0 - result = runner.invoke(generate_component_type_command, ["baz"]) - assert result.exit_code != 0 + result = runner.invoke("generate", "component-type", "baz") + assert_runner_result(result) + result = runner.invoke("generate", "component-type", "baz") + assert_runner_result(result, exit_0=False) assert "already exists" in result.output @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_no_params_success(in_deployment: bool) -> None: - runner = CliRunner() - with isolated_example_code_location_bar_with_component_type_baz(runner, in_deployment): - result = runner.invoke(generate_component_command, ["bar.baz", "qux"]) - assert result.exit_code == 0 + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner, in_deployment): + result = runner.invoke( + "generate", + "component", + "dagster_components.test.all_metadata_empty_asset", + "qux", + ) + assert_runner_result(result) assert Path("bar/components/qux").exists() - assert Path("bar/components/qux/sample.py").exists() # default filename component_yaml_path = Path("bar/components/qux/component.yaml") assert component_yaml_path.exists() - assert "type: bar.baz" in component_yaml_path.read_text() + assert ( + "type: dagster_components.test.all_metadata_empty_asset" + in component_yaml_path.read_text() + ) @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_json_params_success(in_deployment: bool) -> None: - runner = CliRunner() - with isolated_example_code_location_bar_with_component_type_baz(runner, in_deployment): + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner, in_deployment): result = runner.invoke( - generate_component_command, - ["bar.baz", "qux", "--json-params", '{"filename": "hello.py"}'], + "generate", + "component", + "dagster_components.test.simple_pipes_script_asset", + "qux", + "--json-params", + '{"asset_key": "foo", "filename": "hello.py"}', ) - assert result.exit_code == 0 + assert_runner_result(result) assert Path("bar/components/qux").exists() assert Path("bar/components/qux/hello.py").exists() component_yaml_path = Path("bar/components/qux/component.yaml") assert component_yaml_path.exists() - assert "type: bar.baz" in component_yaml_path.read_text() + assert ( + "type: dagster_components.test.simple_pipes_script_asset" + in component_yaml_path.read_text() + ) @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_extra_args_success(in_deployment: bool) -> None: - runner = CliRunner() - with isolated_example_code_location_bar_with_component_type_baz(runner, in_deployment): + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner, in_deployment): result = runner.invoke( - generate_component_command, ["bar.baz", "qux", "--", "--filename=hello.py"] + "generate", + "component", + "dagster_components.test.simple_pipes_script_asset", + "qux", + "--", + "--asset-key=foo", + "--filename=hello.py", ) - assert result.exit_code == 0 + assert_runner_result(result) assert Path("bar/components/qux").exists() assert Path("bar/components/qux/hello.py").exists() component_yaml_path = Path("bar/components/qux/component.yaml") assert component_yaml_path.exists() - assert "type: bar.baz" in component_yaml_path.read_text() + assert ( + "type: dagster_components.test.simple_pipes_script_asset" + in component_yaml_path.read_text() + ) def test_generate_component_json_params_and_extra_args_fails() -> None: - runner = CliRunner() - with isolated_example_code_location_bar_with_component_type_baz(runner): + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner): result = runner.invoke( - generate_component_command, - [ - "bar.baz", - "qux", - "--json-params", - '{"filename": "hello.py"}', - "--", - "--filename=hello.py", - ], + "generate", + "component", + "dagster_components.test.simple_pipes_script_asset", + "qux", + "--json-params", + '{"filename": "hello.py"}', + "--", + "--filename=hello.py", ) - assert result.exit_code != 0 + assert_runner_result(result, exit_0=False) assert "Detected both --json-params and EXTRA_ARGS" in result.output def test_generate_component_outside_code_location_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_deployment_foo(runner): - result = runner.invoke(generate_component_command, ["bar.baz", "qux"]) - assert result.exit_code != 0 + result = runner.invoke("generate", "component", "bar.baz", "qux") + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster code location directory" in result.output @pytest.mark.parametrize("in_deployment", [True, False]) def test_generate_component_already_exists_fails(in_deployment: bool) -> None: - runner = CliRunner() - with isolated_example_code_location_bar_with_component_type_baz(runner, in_deployment): - result = runner.invoke(generate_component_command, ["bar.baz", "qux"]) - assert result.exit_code == 0 - result = runner.invoke(generate_component_command, ["bar.baz", "qux"]) - assert result.exit_code != 0 + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner, in_deployment): + result = runner.invoke( + "generate", + "component", + "dagster_components.test.all_metadata_empty_asset", + "qux", + ) + assert_runner_result(result) + result = runner.invoke( + "generate", + "component", + "dagster_components.test.all_metadata_empty_asset", + "qux", + ) + assert_runner_result(result, exit_0=False) assert "already exists" in result.output +# ######################## +# ##### REAL COMPONENTS +# ######################## + + def test_generate_sling_replication_instance() -> None: runner = CliRunner() with isolated_example_code_location_bar(runner): @@ -362,9 +292,9 @@ def test_generate_sling_replication_instance() -> None: ["uv", "add", "dagster-components[sling]", "dagster-embedded-elt"], check=True ) result = runner.invoke( - generate_component_command, ["dagster_components.sling_replication", "file_ingest"] + dg_cli, ["generate", "component", "dagster_components.sling_replication", "file_ingest"] ) - assert result.exit_code == 0 + assert_runner_result(result) assert Path("bar/components/file_ingest").exists() component_yaml_path = Path("bar/components/file_ingest/component.yaml") @@ -393,9 +323,10 @@ def test_generate_dbt_project_instance(params) -> None: # direct dependencies will be resolved by uv.tool.sources. subprocess.run(["uv", "add", "dagster-components[dbt]", "dagster-dbt"], check=True) result = runner.invoke( - generate_component_command, ["dagster_components.dbt_project", "my_project", *params] + dg_cli, + ["generate", "component", "dagster_components.dbt_project", "my_project", *params], ) - assert result.exit_code == 0 + assert_runner_result(result) assert Path("bar/components/my_project").exists() component_yaml_path = Path("bar/components/my_project/component.yaml") diff --git a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_info_commands.py b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_info_commands.py index 82a68cf6105d4..eb4305fae02ca 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_info_commands.py +++ b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_info_commands.py @@ -1,91 +1,204 @@ import textwrap -from click.testing import CliRunner -from dagster_dg.cli.info import info_component_type_command from dagster_dg.utils import ensure_dagster_dg_tests_import ensure_dagster_dg_tests_import() -from dagster_dg_tests.cli_tests.test_generate_commands import isolated_example_code_location_bar +from dagster_dg_tests.utils import ( + ProxyRunner, + assert_runner_result, + isolated_example_code_location_bar, +) def test_info_component_type_all_metadata_success(): - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner): result = runner.invoke( - info_component_type_command, ["dagster_components.pipes_subprocess_script_collection"] + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + ) + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + dagster_components.test.simple_pipes_script_asset + + Description: + + A simple asset that runs a Python script with the Pipes subprocess client. + + Because it is a pipes asset, no value is returned. + + Generate params schema: + + { + "properties": { + "asset_key": { + "title": "Asset Key", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "asset_key", + "filename" + ], + "title": "SimplePipesScriptAssetParams", + "type": "object" + } + + Component params schema: + + { + "properties": { + "asset_key": { + "title": "Asset Key", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "asset_key", + "filename" + ], + "title": "SimplePipesScriptAssetParams", + "type": "object" + } + """).strip() ) - assert result.exit_code == 0 - assert result.output.startswith( - textwrap.dedent(""" - dagster_components.pipes_subprocess_script_collection - Description: - Assets that wrap Python scripts +def test_info_component_type_all_metadata_empty_success(): + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner): + result = runner.invoke( + "info", + "component-type", + "dagster_components.test.all_metadata_empty_asset", + ) + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + dagster_components.test.all_metadata_empty_asset """).strip() ) def test_info_component_type_flag_fields_success(): - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner): result = runner.invoke( - info_component_type_command, - ["dagster_components.pipes_subprocess_script_collection", "--description"], + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + "--description", ) - assert result.exit_code == 0 - assert result.output.startswith( - textwrap.dedent(""" - Assets that wrap Python scripts + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + A simple asset that runs a Python script with the Pipes subprocess client. + + Because it is a pipes asset, no value is returned. """).strip() ) result = runner.invoke( - info_component_type_command, - ["dagster_components.pipes_subprocess_script_collection", "--generate-params-schema"], + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + "--generate-params-schema", ) - assert result.exit_code == 0 - assert result.output.startswith( - textwrap.dedent(""" - No generate params schema defined. + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + { + "properties": { + "asset_key": { + "title": "Asset Key", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "asset_key", + "filename" + ], + "title": "SimplePipesScriptAssetParams", + "type": "object" + } """).strip() ) result = runner.invoke( - info_component_type_command, - ["dagster_components.pipes_subprocess_script_collection", "--component-params-schema"], + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + "--component-params-schema", ) - assert result.exit_code == 0 - assert result.output.startswith( - textwrap.dedent(""" - No component params schema defined. + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + { + "properties": { + "asset_key": { + "title": "Asset Key", + "type": "string" + }, + "filename": { + "title": "Filename", + "type": "string" + } + }, + "required": [ + "asset_key", + "filename" + ], + "title": "SimplePipesScriptAssetParams", + "type": "object" + } """).strip() ) def test_info_component_type_outside_code_location_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): result = runner.invoke( - info_component_type_command, ["dagster_components.pipes_subprocess_script_collection"] + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + "--component-params-schema", ) - assert result.exit_code != 0 + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster code location directory" in result.output def test_info_component_type_multiple_flags_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner): result = runner.invoke( - info_component_type_command, - [ - "dagster_components.pipes_subprocess_script_collection", - "--description", - "--generate-params-schema", - ], + "info", + "component-type", + "dagster_components.test.simple_pipes_script_asset", + "--description", + "--generate-params-schema", ) - assert result.exit_code != 0 + assert_runner_result(result, exit_0=False) assert ( "Only one of --description, --generate-params-schema, and --component-params-schema can be specified." in result.output diff --git a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_list_commands.py b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_list_commands.py index d50dc8c905764..5d9adb3a17d15 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_list_commands.py +++ b/python_modules/libraries/dagster-dg/dagster_dg_tests/cli_tests/test_list_commands.py @@ -1,73 +1,89 @@ -import re +import textwrap -from click.testing import CliRunner -from dagster_dg.cli.generate import generate_code_location_command, generate_component_command -from dagster_dg.cli.list import ( - list_code_locations_command, - list_component_types_command, - list_components_command, -) from dagster_dg.utils import ensure_dagster_dg_tests_import ensure_dagster_dg_tests_import() -from dagster_dg_tests.cli_tests.test_generate_commands import ( +from dagster_dg_tests.utils import ( + ProxyRunner, + assert_runner_result, isolated_example_code_location_bar, - isolated_example_code_location_bar_with_component_type_baz, isolated_example_deployment_foo, ) def test_list_code_locations_success(): - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_deployment_foo(runner): - runner.invoke(generate_code_location_command, ["foo"]) - runner.invoke(generate_code_location_command, ["bar"]) - result = runner.invoke(list_code_locations_command) - assert result.exit_code == 0 - assert result.output == "bar\nfoo\n" + runner.invoke("generate", "code-location", "foo") + runner.invoke("generate", "code-location", "bar") + result = runner.invoke("list", "code-locations") + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + bar + foo + """).strip() + ) def test_list_code_locations_outside_deployment_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): - result = runner.invoke(list_code_locations_command) - assert result.exit_code != 0 + result = runner.invoke("list", "code-locations") + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster deployment directory" in result.output def test_list_component_types_success(): - runner = CliRunner() + runner = ProxyRunner.test() with isolated_example_code_location_bar(runner): - result = runner.invoke(list_component_types_command) - assert result.exit_code == 0 - lines = result.output.strip().split("\n") - assert len(lines) == 2 - assert lines[0] == "dagster_components.pipes_subprocess_script_collection" - assert re.match(r" Assets that wrap.*", lines[1]) + result = runner.invoke("list", "component-types") + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + dagster_components.test.all_metadata_empty_asset + dagster_components.test.simple_asset + A simple asset that returns a constant string value. + dagster_components.test.simple_pipes_script_asset + A simple asset that runs a Python script with the Pipes subprocess client. + """).strip() + ) def test_list_component_types_outside_code_location_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): - result = runner.invoke(list_component_types_command) - assert result.exit_code != 0 + result = runner.invoke("list", "component-types") + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster code location directory" in result.output def test_list_components_succeeds(): - runner = CliRunner() - # with isolated_example_code_location_bar(runner): - with isolated_example_code_location_bar_with_component_type_baz(runner): - result = runner.invoke(list_components_command) - runner.invoke(generate_component_command, ["bar.baz", "qux"]) - result = runner.invoke(list_components_command) - assert result.output == "qux\n" + runner = ProxyRunner.test() + with isolated_example_code_location_bar(runner): + result = runner.invoke( + "generate", + "component", + "dagster_components.test.all_metadata_empty_asset", + "qux", + ) + assert_runner_result(result) + result = runner.invoke("list", "components") + assert_runner_result(result) + assert ( + result.output.strip() + == textwrap.dedent(""" + qux + """).strip() + ) def test_list_components_command_outside_code_location_fails() -> None: - runner = CliRunner() + runner = ProxyRunner.test() with runner.isolated_filesystem(): - result = runner.invoke(list_components_command) - assert result.exit_code != 0 + result = runner.invoke("list", "components") + assert_runner_result(result, exit_0=False) assert "must be run inside a Dagster code location directory" in result.output diff --git a/python_modules/libraries/dagster-dg/dagster_dg_tests/utils.py b/python_modules/libraries/dagster-dg/dagster_dg_tests/utils.py new file mode 100644 index 0000000000000..0d32afe12494c --- /dev/null +++ b/python_modules/libraries/dagster-dg/dagster_dg_tests/utils.py @@ -0,0 +1,90 @@ +import traceback +from contextlib import contextmanager +from dataclasses import dataclass +from pathlib import Path +from types import TracebackType +from typing import Iterator, Optional, Sequence, Tuple, Type, Union + +from click.testing import CliRunner, Result +from dagster_dg.cli import cli as dg_cli +from dagster_dg.utils import discover_git_root, pushd + + +@contextmanager +def isolated_example_deployment_foo(runner: Union[CliRunner, "ProxyRunner"]) -> Iterator[None]: + runner = ProxyRunner(runner) if isinstance(runner, CliRunner) else runner + with runner.isolated_filesystem(): + runner.invoke("generate", "deployment", "foo") + with pushd("foo"): + yield + + +@contextmanager +def isolated_example_code_location_bar( + runner: Union[CliRunner, "ProxyRunner"], in_deployment: bool = True +) -> Iterator[None]: + runner = ProxyRunner(runner) if isinstance(runner, CliRunner) else runner + dagster_git_repo_dir = str(discover_git_root(Path(__file__))) + if in_deployment: + with isolated_example_deployment_foo(runner): + runner.invoke( + "generate", + "code-location", + "--use-editable-dagster", + dagster_git_repo_dir, + "bar", + ) + with pushd("code_locations/bar"): + yield + else: + with runner.isolated_filesystem(): + runner.invoke( + "generate", + "code-location", + "--use-editable-dagster", + dagster_git_repo_dir, + "bar", + ) + with pushd("bar"): + yield + + +@dataclass +class ProxyRunner: + original: CliRunner + prepend_args: Optional[Sequence[str]] = None + + @classmethod + def test(cls): + return cls(CliRunner(), ["--builtin-component-lib", "dagster_components.test"]) + + def invoke(self, *args: str): + all_args = [*(self.prepend_args or []), *args] + return self.original.invoke(dg_cli, all_args) + + @contextmanager + def isolated_filesystem(self) -> Iterator[None]: + with self.original.isolated_filesystem(): + yield + + +def assert_runner_result(result: Result, exit_0: bool = True) -> None: + try: + assert result.exit_code == 0 if exit_0 else result.exit_code != 0 + except AssertionError: + if result.output: + print(result.output) # noqa: T201 + if result.exc_info: + print_exception_info(result.exc_info) + raise + + +def print_exception_info( + exc_info: Tuple[Type[BaseException], BaseException, TracebackType], +) -> None: + """Prints a nicely formatted traceback for the current exception.""" + exc_type, exc_value, exc_traceback = exc_info + print("Exception Traceback (most recent call last):") # noqa: T201 + formatted_traceback = "".join(traceback.format_tb(exc_traceback)) + print(formatted_traceback) # noqa: T201 + print(f"{exc_type.__name__}: {exc_value}") # noqa: T201