Skip to content

Commit

Permalink
[components] Add custom scope for custom components (#26594)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Small change, big usability upgrade. This lets users define custom scope for users of their component to take advantage of.

This scope can be anything from convenience string constants to complex functions that return raw python types.

In the unit test, I have an example showing how this could let someone set up a custom automation condition constructor, but the possibilities here are kinda endless. For example, you could easily imagine creating more complex translator methods without having to fully give up the yaml world using this capability.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 19, 2024
1 parent 234e10c commit 09333d9
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Component(ABC):
params_schema: ClassVar = None
generate_params_schema: ClassVar = None

@classmethod
def get_rendering_scope(cls) -> Mapping[str, Any]:
return {}

@classmethod
def generate_files(cls, request: ComponentGenerateRequest, params: Any) -> None: ...

Expand Down Expand Up @@ -233,6 +237,12 @@ def path(self) -> Path:

return self.decl_node.path

def with_rendering_scope(self, rendering_scope: Mapping[str, Any]) -> "ComponentLoadContext":
return dataclasses.replace(
self,
templated_value_resolver=self.templated_value_resolver.with_context(**rendering_scope),
)

def for_decl_node(self, decl_node: ComponentDeclNode) -> "ComponentLoadContext":
return dataclasses.replace(self, decl_node=decl_node)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def load_module_from_path(module_name, path) -> ModuleType:
def load_components_from_context(context: ComponentLoadContext) -> Sequence[Component]:
if isinstance(context.decl_node, YamlComponentDecl):
component_type = component_type_from_yaml_decl(context.registry, context.decl_node)
context = context.with_rendering_scope(component_type.get_rendering_scope())
return [component_type.load(context)]
elif isinstance(context.decl_node, ComponentFolder):
components = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import dagster._check as check
from dagster._record import record
from jinja2 import Template
from jinja2.nativetypes import NativeTemplate
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo

Expand Down Expand Up @@ -51,8 +51,8 @@ def default() -> "TemplatedValueResolver":
def with_context(self, **additional_context) -> "TemplatedValueResolver":
return TemplatedValueResolver(context={**self.context, **additional_context})

def resolve(self, val: str) -> str:
return Template(val).render(**self.context)
def resolve(self, val: Any) -> Any:
return NativeTemplate(val).render(**self.context) if isinstance(val, str) else val


def _should_render(
Expand All @@ -70,7 +70,7 @@ def _should_render(

# Optional[ComplexType] (e.g.) will contain multiple schemas in the "anyOf" field
if "anyOf" in subschema:
return any(_should_render(valpath, json_schema, inner) for inner in subschema["anyOf"])
return all(_should_render(valpath, json_schema, inner) for inner in subschema["anyOf"])

el = valpath[0]
if isinstance(el, str):
Expand All @@ -84,9 +84,9 @@ def _should_render(
else:
check.failed(f"Unexpected valpath element: {el}")

# the path wasn't valid
# the path wasn't valid, or unspecified
if not inner:
return False
return subschema.get("additionalProperties", True)

_, *rest = valpath
return _should_render(rest, json_schema, inner)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any, Mapping

from dagster import AssetSpec, AutomationCondition, Definitions
from dagster_components import Component, ComponentLoadContext, component
from pydantic import BaseModel


def my_custom_fn(a: str, b: str) -> str:
return a + "|" + b


def my_custom_automation_condition(cron_schedule: str) -> AutomationCondition:
return AutomationCondition.cron_tick_passed(cron_schedule) & ~AutomationCondition.in_progress()


class CustomScopeParams(BaseModel):
attributes: Mapping[str, Any]


@component(name="custom_scope_component")
class HasCustomScope(Component):
params_schema = CustomScopeParams

@classmethod
def get_rendering_scope(cls) -> Mapping[str, Any]:
return {
"custom_str": "xyz",
"custom_dict": {"a": "b"},
"custom_fn": my_custom_fn,
"custom_automation_condition": my_custom_automation_condition,
}

def __init__(self, attributes: Mapping[str, Any]):
self.attributes = attributes

@classmethod
def load(cls, context: ComponentLoadContext):
loaded_params = context.load_params(cls.params_schema)
return cls(attributes=loaded_params.attributes)

def build_defs(self, context: ComponentLoadContext):
return Definitions(assets=[AssetSpec(key="key", **self.attributes)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: .custom_scope_component

params:
attributes:
group_name: "{{ custom_str }}"
tags: "{{ custom_dict }}"
metadata:
prefixed: "prefixed_{{ custom_fn('a', custom_str) }}"
automation_condition: "{{ custom_automation_condition('@daily') }}"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class Outer(BaseModel):
(["inner_optional", 0, "deferred"], False),
(["inner_deferred_optional", 0], False),
(["inner_deferred_optional", 0, "a"], False),
(["NONEXIST", 0, "deferred"], False),
],
)
def test_should_render(path, expected: bool) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path

from dagster import AssetSpec, AutomationCondition
from dagster_components.core.component_defs_builder import build_defs_from_component_path

from dagster_components_tests.utils import registry


def test_custom_scope() -> None:
defs = build_defs_from_component_path(
path=Path(__file__).parent / "custom_scope_component",
registry=registry(),
resources={},
)

assets = list(defs.assets or [])
assert len(assets) == 1
spec = assets[0]
assert isinstance(spec, AssetSpec)

assert spec.group_name == "xyz"
assert spec.tags == {"a": "b"}
assert spec.metadata == {"prefixed": "prefixed_a|xyz"}
assert (
spec.automation_condition
== AutomationCondition.cron_tick_passed("@daily") & ~AutomationCondition.in_progress()
)

0 comments on commit 09333d9

Please sign in to comment.