diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index 04db0c28c4e6f..13b98f965e067 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -24,7 +24,6 @@ Nothing, PartitionsDefinition, RetryPolicy, - TimeWindowPartitionsDefinition, multi_asset, ) from dagster._core.definitions.metadata.source_code import ( @@ -387,13 +386,6 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource **(op_tags if op_tags else {}), } - if ( - partitions_def - and isinstance(partitions_def, TimeWindowPartitionsDefinition) - and not backfill_policy - ): - backfill_policy = BackfillPolicy.single_run() - return multi_asset( outs=outs, name=name, diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 8b61dbc248d65..0fa97ec1825d4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -6,7 +6,6 @@ from dagster import ( AssetKey, AutoMaterializePolicy, - BackfillPolicy, DagsterInvalidDefinitionError, DailyPartitionsDefinition, Definitions, @@ -19,12 +18,12 @@ PartitionMapping, PartitionsDefinition, RetryPolicy, - StaticPartitionsDefinition, TimeWindowPartitionMapping, asset, materialize, ) from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.types.dagster_type import DagsterType @@ -258,59 +257,6 @@ def my_dbt_assets(): ... assert output_def.io_manager_key == DEFAULT_IO_MANAGER_KEY -@pytest.mark.parametrize( - ["partitions_def", "backfill_policy", "expected_backfill_policy"], - [ - ( - DailyPartitionsDefinition(start_date="2023-01-01"), - BackfillPolicy.multi_run(), - BackfillPolicy.multi_run(), - ), - ( - DailyPartitionsDefinition(start_date="2023-01-01"), - None, - BackfillPolicy.single_run(), - ), - ( - StaticPartitionsDefinition(partition_keys=["A", "B"]), - None, - None, - ), - ( - StaticPartitionsDefinition(partition_keys=["A", "B"]), - BackfillPolicy.single_run(), - BackfillPolicy.single_run(), - ), - ], - ids=[ - "use explicit backfill policy for time window", - "time window defaults to single run", - "non time window has no default backfill policy", - "non time window backfill policy is respected", - ], -) -def test_backfill_policy( - test_jaffle_shop_manifest: Dict[str, Any], - partitions_def: PartitionsDefinition, - backfill_policy: BackfillPolicy, - expected_backfill_policy: BackfillPolicy, -) -> None: - class CustomDagsterDbtTranslator(DagsterDbtTranslator): - def get_freshness_policy(self, _: Mapping[str, Any]) -> Optional[FreshnessPolicy]: - # Disable freshness policies when using static partitions - return None - - @dbt_assets( - manifest=test_jaffle_shop_manifest, - partitions_def=partitions_def, - backfill_policy=backfill_policy, - dagster_dbt_translator=CustomDagsterDbtTranslator(), - ) - def my_dbt_assets(): ... - - assert my_dbt_assets.backfill_policy == expected_backfill_policy - - @pytest.mark.parametrize( "retry_policy", [ @@ -1025,3 +971,19 @@ def my_dbt_assets(): ... AssetKey(["customers"]), AssetKey(["orders"]), } + + +def test_partitioned_dbt_asset_with_regular_partitioned_asset_in_asset_job( + test_jaffle_shop_manifest: Dict[str, Any], +) -> None: + partitions_def = DailyPartitionsDefinition(start_date="2023-01-01") + + @asset(partitions_def=partitions_def) + def foo() -> int: ... + + @dbt_assets(partitions_def=partitions_def, manifest=test_jaffle_shop_manifest) + def my_dbt_assets(): ... + + asset_job = define_asset_job("asset_job", [foo, my_dbt_assets]) + + assert Definitions(assets=[foo, my_dbt_assets], jobs=[asset_job]).get_job_def("asset_job")