From 433e129e2d7712d8bc3b393ef5750af253ad1b5d Mon Sep 17 00:00:00 2001 From: Aksel Stokseth <36400733+aksestok@users.noreply.github.com> Date: Wed, 4 Sep 2024 00:58:21 +0200 Subject: [PATCH] [dagster-embedded-elt][dlt] Support `AutomationCondition` (#24173) ## Summary & Motivation Add support `AutomationCondition` in `DagsterDltTranslator`. Inspired by the implementation in `dagster-dbt`: https://github.com/dagster-io/dagster/blob/9b9b96842bad988e9173ab2afca8748ad3bc43aa/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py#L432-L488 https://github.com/dagster-io/dagster/blob/9b9b96842bad988e9173ab2afca8748ad3bc43aa/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py#L829-L831 ## How I Tested These Changes [Added tests](https://github.com/aksestok/dagster/blob/3e8cb2eb2ebc39cbcfba48e043a5d19748345558/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py#L155-L190). ## Changelog [New] > [dagster-dlt] Added support for `AutomationCondition` using `DagsterDltTranslator.get_automation_condition()` (thanks, [aksestok](https://github.com/aksestok)!) --- .../dlt/asset_decorator.py | 2 +- .../dagster_embedded_elt/dlt/translator.py | 20 +++++++++- .../dlt_tests/test_asset_decorator.py | 39 +++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py index f7768f5f8b239..db99255f61759 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py @@ -41,7 +41,7 @@ def build_dlt_asset_specs( return [ AssetSpec( key=dagster_dlt_translator.get_asset_key(dlt_source_resource), - auto_materialize_policy=dagster_dlt_translator.get_auto_materialize_policy( + automation_condition=dagster_dlt_translator.get_automation_condition( dlt_source_resource ), deps=dagster_dlt_translator.get_deps_asset_keys(dlt_source_resource), diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/translator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/translator.py index 105dd86512786..f4b7cb117e104 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/translator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/translator.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Any, Iterable, Mapping, Optional, Sequence -from dagster import AssetKey, AutoMaterializePolicy +from dagster import AssetKey, AutoMaterializePolicy, AutomationCondition from dagster._annotations import public from dlt.extract.resource import DltResource @@ -38,6 +38,24 @@ def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMat """ return None + @public + def get_automation_condition(self, resource: DltResource) -> Optional[AutomationCondition]: + """Defines resource specific automation condition. + + This method can be overridden to provide custom automation condition for a dlt resource. + + Args: + resource (DltResource): dlt resource + + Returns: + Optional[AutomationCondition]: The automation condition for a resource + + """ + auto_materialize_policy = self.get_auto_materialize_policy(resource) + return ( + auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None + ) + @public def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]: """Defines upstream asset dependencies given a dlt resource. diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py index 728c5ef00c8dc..303ad0a15d6d2 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py @@ -8,6 +8,7 @@ AssetKey, AutoMaterializePolicy, AutoMaterializeRule, + AutomationCondition, Definitions, MonthlyPartitionsDefinition, ) @@ -151,6 +152,44 @@ def assets(): assert "0 1 * * *" in str(item) +def test_get_automation_condition(dlt_pipeline: Pipeline): + class CustomDagsterDltTranslator(DagsterDltTranslator): + def get_automation_condition(self, resource: DltResource) -> Optional[AutomationCondition]: + return AutomationCondition.eager() | AutomationCondition.on_cron("0 1 * * *") + + @dlt_assets( + dlt_source=pipeline(), + dlt_pipeline=dlt_pipeline, + dagster_dlt_translator=CustomDagsterDltTranslator(), + ) + def assets(): + pass + + for item in assets.automation_conditions_by_key.values(): + assert "0 1 * * *" in str(item) + + +def test_get_automation_condition_converts_auto_materialize_policy(dlt_pipeline: Pipeline): + class CustomDagsterDltTranslator(DagsterDltTranslator): + def get_auto_materialize_policy( + self, resource: DltResource + ) -> Optional[AutoMaterializePolicy]: + return AutoMaterializePolicy.eager().with_rules( + AutoMaterializeRule.materialize_on_cron("0 1 * * *") + ) + + @dlt_assets( + dlt_source=pipeline(), + dlt_pipeline=dlt_pipeline, + dagster_dlt_translator=CustomDagsterDltTranslator(), + ) + def assets(): + pass + + for item in assets.automation_conditions_by_key.values(): + assert "0 1 * * *" in str(item) + + def test_example_pipeline_has_required_metadata_keys(dlt_pipeline: Pipeline): required_metadata_keys = { "destination_type",