Skip to content

Commit

Permalink
[dagster-embedded-elt][dlt] Support AutomationCondition (#24173)
Browse files Browse the repository at this point in the history
  • Loading branch information
aksestok authored Sep 3, 2024
1 parent f8c58ab commit 433e129
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_dlt_asset_specs(
return [
AssetSpec(
key=dagster_dlt_translator.get_asset_key(dlt_source_resource),
auto_materialize_policy=dagster_dlt_translator.get_auto_materialize_policy(
automation_condition=dagster_dlt_translator.get_automation_condition(
dlt_source_resource
),
deps=dagster_dlt_translator.get_deps_asset_keys(dlt_source_resource),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional, Sequence

from dagster import AssetKey, AutoMaterializePolicy
from dagster import AssetKey, AutoMaterializePolicy, AutomationCondition
from dagster._annotations import public
from dlt.extract.resource import DltResource

Expand Down Expand Up @@ -38,6 +38,24 @@ def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMat
"""
return None

@public
def get_automation_condition(self, resource: DltResource) -> Optional[AutomationCondition]:
"""Defines resource specific automation condition.
This method can be overridden to provide custom automation condition for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[AutomationCondition]: The automation condition for a resource
"""
auto_materialize_policy = self.get_auto_materialize_policy(resource)
return (
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)

@public
def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
"""Defines upstream asset dependencies given a dlt resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AssetKey,
AutoMaterializePolicy,
AutoMaterializeRule,
AutomationCondition,
Definitions,
MonthlyPartitionsDefinition,
)
Expand Down Expand Up @@ -151,6 +152,44 @@ def assets():
assert "0 1 * * *" in str(item)


def test_get_automation_condition(dlt_pipeline: Pipeline):
class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_automation_condition(self, resource: DltResource) -> Optional[AutomationCondition]:
return AutomationCondition.eager() | AutomationCondition.on_cron("0 1 * * *")

@dlt_assets(
dlt_source=pipeline(),
dlt_pipeline=dlt_pipeline,
dagster_dlt_translator=CustomDagsterDltTranslator(),
)
def assets():
pass

for item in assets.automation_conditions_by_key.values():
assert "0 1 * * *" in str(item)


def test_get_automation_condition_converts_auto_materialize_policy(dlt_pipeline: Pipeline):
class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_auto_materialize_policy(
self, resource: DltResource
) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 1 * * *")
)

@dlt_assets(
dlt_source=pipeline(),
dlt_pipeline=dlt_pipeline,
dagster_dlt_translator=CustomDagsterDltTranslator(),
)
def assets():
pass

for item in assets.automation_conditions_by_key.values():
assert "0 1 * * *" in str(item)


def test_example_pipeline_has_required_metadata_keys(dlt_pipeline: Pipeline):
required_metadata_keys = {
"destination_type",
Expand Down

0 comments on commit 433e129

Please sign in to comment.