Skip to content

Commit

Permalink
[components] Split out dg-cli from dagster-components (#26364)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Splits out `dg-cli` from `dagster-components`. `dg-cli` does not depend
on any other dagster package.

All possible generation logic is moved to `dg-cli`. However, `dg-cli`
sometimes needs to run commands in a code location environment:

- Querying the environment to see the defined components
- Running the `generate_files` routine to generate a new component

This functionality is exposed from `dagster-components` via a new
`dagster-components` CLI, that is basically a backend for `dg`. It's not
intended to be called by users. The plumbing between the two packages
could be improved but this is a working first implementation.

Other comments:

- `dagster-components` no longer has any concept of a "deployment"-- the
`DeploymentProjectContext` has been moved to `dg`. However, both
`dg-cli` and `dagster-components` have slightly differing versions of
`CodeLocationProjectContext`. The `dg-cli` version uses a new
`RemoteComponentRegistry` and `RemoteComponentType` class to represent
the contents of the component registry in the code location.
- `dg generate component` will forward the `-- extra args` or
`--json-params` untouched onto `dagster-components generate component`
- `dagster-components` doesn't have any _direct_ CLI testing right now--
the testing is done via the higher level calls from `dg`

## How I Tested These Changes

Unit tests.
  • Loading branch information
smackesey authored Dec 10, 2024
1 parent 1ccc785 commit 38fd122
Show file tree
Hide file tree
Showing 35 changed files with 1,044 additions and 246 deletions.
3 changes: 1 addition & 2 deletions python_modules/libraries/dagster-components/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# dagster-components

Experimental API for defining Dagster definitions factories ("components").
Includes the `dg` CLI tool.
Experimental Python API for defining Dagster definitions factories ("components").
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from dagster.version import __version__

from dagster_components.cli.generate import generate_cli
from dagster_components.cli.list import list_cli


def create_dagster_components_cli():
commands = {
"generate": generate_cli,
"list": list_cli,
}

@click.group(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import sys
from pathlib import Path
from typing import Optional, Tuple
Expand All @@ -9,93 +8,16 @@
from dagster_components import ComponentRegistry, __component_registry__
from dagster_components.core.deployment import (
CodeLocationProjectContext,
DeploymentProjectContext,
is_inside_code_location_project,
is_inside_deployment_project,
)
from dagster_components.generate import (
generate_code_location,
generate_component_instance,
generate_component_type,
generate_deployment,
)
from dagster_components.generate import generate_component_instance


@click.group(name="generate")
def generate_cli() -> None:
"""Commands for generating Dagster components and related entities."""


@generate_cli.command(name="deployment")
@click.argument("path", type=str)
def generate_deployment_command(path: str) -> None:
"""Generate a Dagster deployment instance."""
dir_abspath = os.path.abspath(path)
if os.path.exists(dir_abspath):
click.echo(
click.style(f"A file or directory at {dir_abspath} already exists. ", fg="red")
+ "\nPlease delete the contents of this path or choose another location."
)
sys.exit(1)
generate_deployment(path)


@generate_cli.command(name="code-location")
@click.argument("name", type=str)
@click.option("--use-editable-dagster", is_flag=True, default=False)
def generate_code_location_command(name: str, use_editable_dagster: bool) -> None:
"""Generate a Dagster code location inside a component."""
if not is_inside_deployment_project(Path(".")):
click.echo(
click.style("This command must be run inside a Dagster deployment project.", fg="red")
)
sys.exit(1)

context = DeploymentProjectContext.from_path(Path.cwd())
if context.has_code_location(name):
click.echo(click.style(f"A code location named {name} already exists.", fg="red"))
sys.exit(1)

if use_editable_dagster:
if "DAGSTER_GIT_REPO_DIR" not in os.environ:
click.echo(
click.style(
"The `--use-editable-dagster` flag requires the `DAGSTER_GIT_REPO_DIR` environment variable to be set.",
fg="red",
)
)
sys.exit(1)
editable_dagster_root = os.environ["DAGSTER_GIT_REPO_DIR"]
else:
editable_dagster_root = None

code_location_path = os.path.join(context.code_location_root_path, name)

generate_code_location(code_location_path, editable_dagster_root)


@generate_cli.command(name="component-type")
@click.argument("name", type=str)
def generate_component_type_command(name: str) -> None:
"""Generate a Dagster component instance."""
if not is_inside_code_location_project(Path(".")):
click.echo(
click.style(
"This command must be run inside a Dagster code location project.", fg="red"
)
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
)
if context.has_component_type(name):
click.echo(click.style(f"A component type named `{name}` already exists.", fg="red"))
sys.exit(1)

generate_component_type(context.component_types_root_path, name)


@generate_cli.command(name="component")
@click.argument("component_type", type=str)
@click.argument("component_name", type=str)
Expand All @@ -107,7 +29,7 @@ def generate_component_command(
json_params: Optional[str],
extra_args: Tuple[str, ...],
) -> None:
if not is_inside_code_location_project(Path(".")):
if not is_inside_code_location_project(Path.cwd()):
click.echo(
click.style(
"This command must be run inside a Dagster code location project.", fg="red"
Expand All @@ -123,11 +45,6 @@ def generate_component_command(
click.style(f"No component type `{component_type}` could be resolved.", fg="red")
)
sys.exit(1)
elif context.has_component_instance(component_name):
click.echo(
click.style(f"A component instance named `{component_name}` already exists.", fg="red")
)
sys.exit(1)

component_type_cls = context.get_component_type(component_type)
generate_params_schema = component_type_cls.generate_params_schema
Expand All @@ -142,6 +59,7 @@ def generate_component_command(
generate_params = inner_ctx.invoke(generate_params_schema.cli, **inner_ctx.params)
else:
generate_params = None

generate_component_instance(
context.component_instances_root_path, component_name, component_type_cls, generate_params
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
import sys
from pathlib import Path
from typing import Any, Dict

import click

from dagster_components import __component_registry__
from dagster_components.core.component import ComponentRegistry
from dagster_components.core.deployment import (
CodeLocationProjectContext,
is_inside_code_location_project,
)


@click.group(name="generate")
def list_cli():
"""Commands for listing Dagster components and related entities."""


@list_cli.command(name="component-types")
def list_component_types_command() -> None:
"""List registered Dagster components."""
if not is_inside_code_location_project(Path.cwd()):
click.echo(
click.style(
"This command must be run inside a Dagster code location project.", fg="red"
)
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
)
output: Dict[str, Any] = {}
for component_type in context.list_component_types():
# package, name = component_type.rsplit(".", 1)
output[component_type] = {
"name": component_type,
}
click.echo(json.dumps(output))
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Final, Iterable, Type

import tomli
from dagster._core.errors import DagsterError
from typing_extensions import Self

Expand All @@ -13,22 +14,9 @@
register_components_in_module,
)


def is_inside_deployment_project(path: Path) -> bool:
try:
_resolve_deployment_root_path(path)
return True
except DagsterError:
return False


def _resolve_deployment_root_path(path: Path) -> str:
current_path = os.path.abspath(path)
while not _is_deployment_root(current_path):
current_path = os.path.dirname(current_path)
if current_path == "/":
raise DagsterError("Cannot find deployment root")
return current_path
# Code location
_CODE_LOCATION_CUSTOM_COMPONENTS_DIR: Final = "lib"
_CODE_LOCATION_COMPONENT_INSTANCES_DIR: Final = "components"


def is_inside_code_location_project(path: Path) -> bool:
Expand All @@ -39,49 +27,21 @@ def is_inside_code_location_project(path: Path) -> bool:
return False


def _resolve_code_location_root_path(path: Path) -> str:
current_path = os.path.abspath(path)
def _resolve_code_location_root_path(path: Path) -> Path:
current_path = path.absolute()
while not _is_code_location_root(current_path):
current_path = os.path.dirname(current_path)
if current_path == "/":
current_path = current_path.parent
if str(current_path) == "/":
raise DagsterError("Cannot find code location root")
return current_path


def _is_deployment_root(path: str) -> bool:
return os.path.exists(os.path.join(path, "code_locations"))


def _is_code_location_root(path: str) -> bool:
return os.path.basename(os.path.dirname(path)) == "code_locations"


# Deployment
_DEPLOYMENT_CODE_LOCATIONS_DIR: Final = "code_locations"

# Code location
_CODE_LOCATION_CUSTOM_COMPONENTS_DIR: Final = "lib"
_CODE_LOCATION_COMPONENT_INSTANCES_DIR: Final = "components"


class DeploymentProjectContext:
@classmethod
def from_path(cls, path: Path) -> Self:
return cls(root_path=_resolve_deployment_root_path(path))

def __init__(self, root_path: str):
self._root_path = root_path

@property
def deployment_root(self) -> str:
return self._root_path

@property
def code_location_root_path(self) -> str:
return os.path.join(self._root_path, _DEPLOYMENT_CODE_LOCATIONS_DIR)

def has_code_location(self, name: str) -> bool:
return os.path.exists(os.path.join(self._root_path, "code_locations", name))
def _is_code_location_root(path: Path) -> bool:
if (path / "pyproject.toml").exists():
with open(path / "pyproject.toml") as f:
toml = tomli.loads(f.read())
return bool(toml.get("tool", {}).get("dagster"))
return False


class CodeLocationProjectContext:
Expand All @@ -100,28 +60,21 @@ def from_path(cls, path: Path, component_registry: "ComponentRegistry") -> Self:
register_components_in_module(component_registry, module)

return cls(
deployment_context=DeploymentProjectContext.from_path(path),
root_path=root_path,
root_path=str(root_path),
name=os.path.basename(root_path),
component_registry=component_registry,
)

def __init__(
self,
deployment_context: DeploymentProjectContext,
root_path: str,
name: str,
component_registry: "ComponentRegistry",
):
self._deployment_context = deployment_context
self._root_path = root_path
self._name = name
self._component_registry = component_registry

@property
def deployment_context(self) -> DeploymentProjectContext:
return self._deployment_context

@property
def component_types_root_path(self) -> str:
return os.path.join(self._root_path, self._name, _CODE_LOCATION_CUSTOM_COMPONENTS_DIR)
Expand All @@ -142,6 +95,9 @@ def get_component_type(self, name: str) -> Type[Component]:
raise DagsterError(f"No component type named {name}")
return self._component_registry.get(name)

def list_component_types(self) -> Iterable[str]:
return sorted(self._component_registry.keys())

def get_component_instance_path(self, name: str) -> str:
if name not in self.component_instances:
raise DagsterError(f"No component instance named {name}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os
import textwrap
from pathlib import Path
from typing import Any, Optional, Type
from typing import Any, Type

import click
import yaml
from dagster._generate.generate import generate_project
from dagster._utils import camelcase, pushd
from dagster._utils import pushd

from dagster_components.core.component import Component, get_component_name

Expand All @@ -19,55 +18,6 @@ def write_line_break(self) -> None:
super().write_line_break()


def generate_deployment(path: str) -> None:
click.echo(f"Creating a Dagster deployment at {path}.")

generate_project(
path=path,
name_placeholder="DEPLOYMENT_NAME_PLACEHOLDER",
templates_path=os.path.join(
os.path.dirname(__file__), "templates", "DEPLOYMENT_NAME_PLACEHOLDER"
),
)


def generate_code_location(path: str, editable_dagster_root: Optional[str] = None) -> None:
click.echo(f"Creating a Dagster code location at {path}.")

if editable_dagster_root:
uv_sources = textwrap.dedent(f"""
[tool.uv.sources]
dagster = {{ path = "{editable_dagster_root}/python_modules/dagster", editable = true }}
dagster-components = {{ path = "{editable_dagster_root}/python_modules/libraries/dagster-components", editable = true }}
dagster-pipes = {{ path = "{editable_dagster_root}/python_modules/dagster-pipes", editable = true }}
dagster-webserver = {{ path = "{editable_dagster_root}/python_modules/dagster-webserver", editable = true }}
""")
else:
uv_sources = ""

generate_project(
path=path,
name_placeholder="CODE_LOCATION_NAME_PLACEHOLDER",
templates_path=os.path.join(
os.path.dirname(__file__), "templates", "CODE_LOCATION_NAME_PLACEHOLDER"
),
uv_sources=uv_sources,
)


def generate_component_type(root_path: str, name: str) -> None:
click.echo(f"Creating a Dagster component type at {root_path}/{name}.py.")

generate_project(
path=root_path,
name_placeholder="COMPONENT_TYPE_NAME_PLACEHOLDER",
templates_path=os.path.join(os.path.dirname(__file__), "templates", "COMPONENT_TYPE"),
project_name=name,
component_type_class_name=camelcase(name),
component_type=name,
)


def generate_component_instance(
root_path: str, name: str, component_type: Type[Component], generate_params: Any
) -> None:
Expand Down
Loading

0 comments on commit 38fd122

Please sign in to comment.