Skip to content

Commit

Permalink
[components] allow automation_condition to be set with AssetSpecModel (
Browse files Browse the repository at this point in the history
…#26451)

## Summary & Motivation

As title.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 13, 2024
1 parent d3bd3f4 commit a64aa2c
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,59 @@
from typing import Dict, Optional
from typing import Any, Dict, Literal, Mapping, Optional, Union

from dagster._core.definitions.asset_spec import AssetSpec, map_asset_specs
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._record import replace
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import BaseModel


class OpSpecBaseModel(BaseModel):
name: Optional[str] = None
tags: Optional[Dict[str, str]] = None


class AutomationConditionModel(BaseModel):
type: str
params: Mapping[str, Any] = {}

def to_automation_condition(self) -> AutomationCondition:
return getattr(AutomationCondition, self.type)(**self.params)


class AssetSpecProcessorModel(BaseModel):
target: str = "*"
strategy: Union[Literal["replace"], Literal["merge"]] = "replace"
description: Optional[str] = None
metadata: Optional[Mapping[str, Any]] = None
group_name: Optional[str] = None
tags: Optional[Mapping[str, str]] = None
automation_condition: Optional[AutomationConditionModel] = None

def _props(self) -> Mapping[str, Any]:
return {
**self.model_dump(exclude={"target", "strategy"}),
"automation_condition": self.automation_condition.to_automation_condition()
if self.automation_condition
else None,
}

@suppress_dagster_warnings
def _apply_to_spec(self, spec: AssetSpec) -> AssetSpec:
if self.strategy == "replace":
return spec.replace_attributes(**self._props())
else:
return spec.merge_attributes(**self._props())

def transform(self, defs: Definitions) -> Definitions:
"""Applies the specified transformation to the asset specs in the given definitions."""
mappable = [d for d in defs.assets or [] if isinstance(d, (AssetsDefinition, AssetSpec))]
mapped_assets = map_asset_specs(self._apply_to_spec, mappable)
assets = [
*mapped_assets,
*[d for d in defs.assets or [] if not isinstance(d, (AssetsDefinition, AssetSpec))],
]
return replace(defs, assets=assets)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from pathlib import Path
from typing import Any, Iterator, Mapping, Optional
from typing import Any, Iterator, Mapping, Optional, Sequence

import click
import dagster._check as check
Expand All @@ -17,7 +17,7 @@
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component import component
from dagster_components.core.component_decl_builder import ComponentDeclNode, YamlComponentDecl
from dagster_components.core.dsl_schema import OpSpecBaseModel
from dagster_components.core.dsl_schema import AssetSpecProcessorModel, OpSpecBaseModel


class DbtNodeTranslatorParams(BaseModel):
Expand All @@ -29,6 +29,7 @@ class DbtProjectParams(BaseModel):
dbt: DbtCliResource
op: Optional[OpSpecBaseModel] = None
translator: Optional[DbtNodeTranslatorParams] = None
asset_attributes: Optional[Sequence[AssetSpecProcessorModel]] = None


class DbtGenerateParams(BaseModel):
Expand Down Expand Up @@ -76,10 +77,12 @@ def __init__(
dbt_resource: DbtCliResource,
op_spec: Optional[OpSpecBaseModel],
dbt_translator: Optional[DagsterDbtTranslator],
asset_transforms: Sequence[AssetSpecProcessorModel],
):
self.dbt_resource = dbt_resource
self.op_spec = op_spec
self.dbt_translator = dbt_translator
self.asset_transforms = asset_transforms

@classmethod
def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclNode) -> Self:
Expand All @@ -96,6 +99,7 @@ def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclN
dbt_translator=DbtProjectComponentTranslator(
translator_params=loaded_params.translator
),
asset_transforms=loaded_params.asset_attributes or [],
)

def build_defs(self, context: ComponentLoadContext) -> Definitions:
Expand All @@ -112,7 +116,10 @@ def build_defs(self, context: ComponentLoadContext) -> Definitions:
def _fn(context: AssetExecutionContext):
yield from self.execute(context=context, dbt=self.dbt_resource)

return Definitions(assets=[_fn])
defs = Definitions(assets=[_fn])
for transform in self.asset_transforms:
defs = transform.transform(defs)
return defs

@classmethod
def generate_files(cls, params: DbtGenerateParams) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
component,
)
from dagster_components.core.component_decl_builder import YamlComponentDecl
from dagster_components.core.dsl_schema import AutomationConditionModel

if TYPE_CHECKING:
from dagster._core.definitions.definitions_class import Definitions
Expand All @@ -33,13 +34,17 @@ class AssetSpecModel(BaseModel):
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Mapping[str, str] = {}
automation_condition: Optional[AutomationConditionModel] = None

@suppress_dagster_warnings
def to_asset_spec(self) -> AssetSpec:
return AssetSpec(
**{
**self.__dict__,
"key": AssetKey.from_user_string(self.key),
"automation_condition": self.automation_condition.to_automation_condition()
if self.automation_condition
else None,
},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from pathlib import Path
from typing import Any, Iterator, Optional, Union
from typing import Any, Iterator, Optional, Sequence, Union

import yaml
from dagster._core.definitions.definitions_class import Definitions
Expand All @@ -14,22 +14,30 @@
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component import component
from dagster_components.core.component_decl_builder import ComponentDeclNode, YamlComponentDecl
from dagster_components.core.dsl_schema import OpSpecBaseModel
from dagster_components.core.dsl_schema import AssetSpecProcessorModel, OpSpecBaseModel


class SlingReplicationParams(BaseModel):
sling: Optional[SlingResource] = None
op: Optional[OpSpecBaseModel] = None
asset_attributes: Optional[Sequence[AssetSpecProcessorModel]] = None


@component(name="sling_replication")
class SlingReplicationComponent(Component):
params_schema = SlingReplicationParams

def __init__(self, dirpath: Path, resource: SlingResource, op_spec: Optional[OpSpecBaseModel]):
def __init__(
self,
dirpath: Path,
resource: SlingResource,
op_spec: Optional[OpSpecBaseModel],
asset_transforms: Sequence[AssetSpecProcessorModel],
):
self.dirpath = dirpath
self.resource = resource
self.op_spec = op_spec
self.asset_transforms = asset_transforms

@classmethod
def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclNode) -> Self:
Expand All @@ -41,6 +49,7 @@ def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclN
dirpath=decl_node.path,
resource=loaded_params.sling or SlingResource(),
op_spec=loaded_params.op,
asset_transforms=loaded_params.asset_attributes or [],
)

def build_defs(self, context: ComponentLoadContext) -> Definitions:
Expand All @@ -52,7 +61,10 @@ def build_defs(self, context: ComponentLoadContext) -> Definitions:
def _fn(context: AssetExecutionContext, sling: SlingResource):
yield from self.execute(context=context, sling=sling)

return Definitions(assets=[_fn], resources={"sling": self.resource})
defs = Definitions(assets=[_fn], resources={"sling": self.resource})
for transform in self.asset_transforms:
defs = transform.transform(defs)
return defs

@classmethod
def generate_files(cls, params: Any) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ params:
- path: script_one.py
assets:
- key: a
automation_condition:
type: eager
- key: b
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
deps: [up1, up2]
- path: script_two.py
assets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ def test_load_from_path(dbt_path: Path) -> None:
)

assert defs.get_asset_graph().get_all_asset_keys() == JAFFLE_SHOP_KEYS

for asset_node in defs.get_asset_graph().asset_nodes:
assert asset_node.tags["foo"] == "bar"
assert asset_node.metadata["something"] == 1
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,13 @@ type: dagster_components.dbt_project
params:
dbt:
project_dir: jaffle_shop

asset_attributes:
- tags:
foo: bar
metadata:
something: 1
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ def test_python_params() -> None:
"scripts": [
{
"path": "script_one.py",
"assets": [{"key": "a"}, {"key": "b", "deps": ["up1", "up2"]}],
"assets": [
{"key": "a", "automation_condition": {"type": "eager"}},
{
"key": "b",
"automation_condition": {
"type": "on_cron",
"params": {"cron_schedule": "@daily"},
},
"deps": ["up1", "up2"],
},
],
},
{"path": "subdir/script_three.py", "assets": [{"key": "key_override"}]},
]
Expand Down

0 comments on commit a64aa2c

Please sign in to comment.