From 1c4a630aeaf4bd36ca98a15efda04ecc86345ced Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 20 Dec 2024 11:32:44 -0500 Subject: [PATCH] [dagster-dbt] Add get_partitions_def() method to DagsterDbtTranslator (#26625) ## Summary & Motivation As title. This lets individual dbt models have distinct partitions definitions ## How I Tested These Changes ## Changelog NOCHANGELOG --- .../dagster/_core/definitions/asset_out.py | 2 +- .../dagster-dbt/dagster_dbt/asset_utils.py | 4 +-- .../dagster_dbt/dagster_dbt_translator.py | 35 +++++++++++++++++++ .../core/test_asset_decorator.py | 29 +++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py index 8d81cf665bcba..f563344e073eb 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_out.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py @@ -228,7 +228,7 @@ def to_spec( key=key, tags={**additional_tags, **self.tags} if self.tags else additional_tags, deps=[*self._spec.deps, *deps], - partitions_def=partitions_def, + partitions_def=partitions_def if partitions_def is not None else ..., ) @public diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 8fa78c1a77812..fa8905edb4e18 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -829,14 +829,14 @@ def build_dbt_multi_asset_args( automation_condition=dagster_dbt_translator.get_automation_condition( dbt_resource_props ), + partitions_def=dagster_dbt_translator.get_partitions_def(dbt_resource_props), ) - if io_manager_key: - spec = spec.with_io_manager_key(io_manager_key) outs[output_name] = AssetOut.from_spec( spec=spec, dagster_type=Nothing, is_required=False, + io_manager_key=io_manager_key, ) test_unique_ids = [ diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py index a879588e274d5..adead4ac14639 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py @@ -10,6 +10,7 @@ _check as check, ) from dagster._annotations import experimental, public +from dagster._core.definitions.partition import PartitionsDefinition from dagster._utils.tags import is_valid_tag_key from dagster_dbt.asset_utils import ( @@ -520,6 +521,40 @@ def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Opt auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None ) + def get_partitions_def( + self, dbt_resource_props: Mapping[str, Any] + ) -> Optional[PartitionsDefinition]: + """[INTERNAL] A function that takes a dictionary representing properties of a dbt resource, and + returns the Dagster :py:class:`dagster.PartitionsDefinition` for that resource. + + This method can be overridden to provide a custom PartitionsDefinition for a dbt resource. + + Args: + dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. + + Returns: + Optional[PartitionsDefinition]: A Dagster partitions definition. + + Examples: + Set a custom AutomationCondition for dbt resources with a specific tag: + + .. code-block:: python + + from typing import Any, Mapping + + from dagster import DailyPartitionsDefinition + from dagster_dbt import DagsterDbtTranslator + + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]: + if "my_custom_tag" in dbt_resource_props.get("tags", []): + return DailyPartitionsDefinition(start_date="2022-01-01") + else: + return None + """ + return None + @dataclass class DbtManifestWrapper: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 1d5e7998a46a8..87a8028e622ac 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -789,6 +789,35 @@ def my_dbt_assets(): ... ) +def test_with_varying_partitions_defs(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01") + override_keys = {AssetKey("customers"), AssetKey("orders")} + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_partitions_def( + self, dbt_resource_props: Mapping[str, Any] + ) -> Optional[PartitionsDefinition]: + asset_key = super().get_asset_key(dbt_resource_props) + if asset_key in override_keys: + return daily_partitions + else: + return None + + @dbt_assets( + manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator() + ) + def my_dbt_assets(): ... + + assert set(my_dbt_assets.keys) > override_keys + + for spec in my_dbt_assets.specs: + partitions_def = spec.partitions_def + if spec.key in override_keys: + assert partitions_def == daily_partitions, spec.key + else: + assert partitions_def is None, spec.key + + def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None: expected_auto_materialize_policy = AutoMaterializePolicy.eager() expected_specs_by_key = {