Skip to content

Commit

Permalink
[components] Allow generate_files to be paramterized (#26305)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Reworks https://github.com/dagster-io/dagster/pull/26283/files to be less fancy.

Just a generic json blob that gets passed through params. Future PR will add an alternative entrypoint with fancier behavior so you don't have to pass in raw json.

## How I Tested These Changes

```
dg generate component dbt_project my_project --params "{\"project_path\": \"../../../dagster/\"}"

```

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 6, 2024
1 parent 29e4731 commit bac34d9
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import sys
from pathlib import Path
from typing import Optional

import click
from pydantic import TypeAdapter

from dagster_components import ComponentRegistry, __component_registry__
from dagster_components.core.deployment import (
Expand Down Expand Up @@ -82,7 +84,8 @@ def generate_component_type_command(name: str) -> None:
@generate_cli.command(name="component")
@click.argument("component-type", type=str)
@click.argument("name", type=str)
def generate_component_command(component_type: str, name: str) -> None:
@click.option("--params", type=str, default=None)
def generate_component_command(component_type: str, name: str, params: Optional[str]) -> None:
"""Generate a Dagster component instance."""
if not is_inside_code_location_project(Path(".")):
click.echo(
Expand All @@ -105,4 +108,11 @@ def generate_component_command(component_type: str, name: str) -> None:
sys.exit(1)

component_type_cls = context.get_component_type(component_type)
generate_component_instance(context.component_instances_root_path, name, component_type_cls)
generate_params = (
TypeAdapter(component_type_cls.generate_params_schema).validate_json(params)
if params
else None
)
generate_component_instance(
context.component_instances_root_path, name, component_type_cls, generate_params
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy
from abc import ABC, abstractmethod
from types import ModuleType
from typing import TYPE_CHECKING, ClassVar, Dict, Iterable, Mapping, Optional, Type
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, Mapping, Optional, Type

from dagster._core.errors import DagsterError
from dagster._utils import snakecase
Expand All @@ -16,14 +16,15 @@ class ComponentDeclNode: ...

class Component(ABC):
name: ClassVar[Optional[str]] = None
defs_params_schema: ClassVar[Type] = Type[None]
generate_params_schema: ClassVar[Type] = Type[None]

@classmethod
def registered_name(cls) -> str:
return cls.name or snakecase(cls.__name__)

@classmethod
def generate_files(cls) -> None:
raise NotImplementedError()
def generate_files(cls, params: Any) -> Optional[Mapping[str, Any]]: ...

@abstractmethod
def build_defs(self, context: "ComponentLoadContext") -> "Definitions": ...
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import os
from typing import Type
from typing import Any, Type

import click
import yaml
from dagster._generate.generate import generate_project
from dagster._utils import camelcase, pushd

from dagster_components.core.component import Component


class DefsDumper(yaml.Dumper):
def write_line_break(self) -> None:
# add an extra line break between top-level keys
if self.indent == 0:
super().write_line_break()
super().write_line_break()


def generate_deployment(path: str) -> None:
click.echo(f"Creating a Dagster deployment at {path}.")

Expand Down Expand Up @@ -44,7 +53,9 @@ def generate_component_type(root_path: str, name: str) -> None:
)


def generate_component_instance(root_path: str, name: str, component_type: Type[Component]) -> None:
def generate_component_instance(
root_path: str, name: str, component_type: Type[Component], generate_params: Any
) -> None:
click.echo(f"Creating a Dagster component instance at {root_path}/{name}.py.")

component_instance_root_path = os.path.join(root_path, name)
Expand All @@ -58,4 +69,13 @@ def generate_component_instance(root_path: str, name: str, component_type: Type[
component_type=component_type.registered_name(),
)
with pushd(component_instance_root_path):
component_type.generate_files()
defs_data: dict = {"component_type": component_type.registered_name()}
component_params = (
component_type.generate_files(generate_params)
if generate_params
else component_type.generate_files() # type: ignore
)
if component_params:
defs_data["component_params"] = component_params
with open("defs.yml", "w") as f:
yaml.dump(defs_data, f, Dumper=DefsDumper, sort_keys=False, default_flow_style=False)
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import os
from pathlib import Path
from typing import Any, Mapping, Optional

import dagster._check as check
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils import pushd
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
from dagster_embedded_elt.sling.resources import AssetExecutionContext
from dbt.cli.main import dbtRunner
from pydantic import BaseModel, TypeAdapter
from typing_extensions import Self

Expand All @@ -13,8 +19,14 @@ class DbtProjectParams(BaseModel):
dbt: DbtCliResource


class DbtGenerateParams(BaseModel):
init: bool = False
project_path: Optional[str] = None


class DbtProjectComponent(Component):
params_schema = DbtProjectParams
generate_params_schema = DbtGenerateParams

def __init__(self, dbt_resource: DbtCliResource):
self.dbt_resource = dbt_resource
Expand Down Expand Up @@ -43,3 +55,18 @@ def _fn(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

return Definitions(assets=[_fn], resources={"dbt": self.dbt_resource})

@classmethod
def generate_files(cls, params: DbtGenerateParams) -> Mapping[str, Any]:
if params.project_path:
relative_path = os.path.relpath(params.project_path, start=os.getcwd())
elif params.init:
dbtRunner().invoke(["init"])
subpaths = list(Path(os.getcwd()).iterdir())
check.invariant(len(subpaths) == 1, "Expected exactly one subpath to be created.")
# this path should be relative to this directory
relative_path = subpaths[0].name
else:
relative_path = None

return {"dbt": {"project_dir": relative_path}}

This file was deleted.

1 change: 1 addition & 0 deletions examples/experimental/dagster-components/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ def get_version() -> str:
extras_require={
"sling": ["dagster-embedded-elt"],
"dbt": ["dagster-dbt"],
"test": ["dbt-duckdb"],
},
)
2 changes: 1 addition & 1 deletion examples/experimental/dagster-components/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ deps =
-e ../../../python_modules/dagster-pipes
-e ../../../python_modules/libraries/dagster-embedded-elt
-e ../../../python_modules/libraries/dagster-dbt
-e .
-e .[test]
allowlist_externals =
/bin/bash
uv
Expand Down

0 comments on commit bac34d9

Please sign in to comment.