Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-dbt] Make default backfill_policy=None always for @dbt_asset #22280

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
Nothing,
PartitionsDefinition,
RetryPolicy,
TimeWindowPartitionsDefinition,
multi_asset,
)
from dagster._core.definitions.metadata.source_code import (
Expand Down Expand Up @@ -387,13 +386,6 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
**(op_tags if op_tags else {}),
}

if (
partitions_def
and isinstance(partitions_def, TimeWindowPartitionsDefinition)
and not backfill_policy
):
backfill_policy = BackfillPolicy.single_run()

return multi_asset(
outs=outs,
name=name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dagster import (
AssetKey,
AutoMaterializePolicy,
BackfillPolicy,
DagsterInvalidDefinitionError,
DailyPartitionsDefinition,
Definitions,
Expand All @@ -19,12 +18,12 @@
PartitionMapping,
PartitionsDefinition,
RetryPolicy,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
materialize,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.types.dagster_type import DagsterType
Expand Down Expand Up @@ -258,59 +257,6 @@ def my_dbt_assets(): ...
assert output_def.io_manager_key == DEFAULT_IO_MANAGER_KEY


@pytest.mark.parametrize(
["partitions_def", "backfill_policy", "expected_backfill_policy"],
[
(
DailyPartitionsDefinition(start_date="2023-01-01"),
BackfillPolicy.multi_run(),
BackfillPolicy.multi_run(),
),
(
DailyPartitionsDefinition(start_date="2023-01-01"),
None,
BackfillPolicy.single_run(),
),
(
StaticPartitionsDefinition(partition_keys=["A", "B"]),
None,
None,
),
(
StaticPartitionsDefinition(partition_keys=["A", "B"]),
BackfillPolicy.single_run(),
BackfillPolicy.single_run(),
),
],
ids=[
"use explicit backfill policy for time window",
"time window defaults to single run",
"non time window has no default backfill policy",
"non time window backfill policy is respected",
],
)
def test_backfill_policy(
test_jaffle_shop_manifest: Dict[str, Any],
partitions_def: PartitionsDefinition,
backfill_policy: BackfillPolicy,
expected_backfill_policy: BackfillPolicy,
) -> None:
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, _: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
# Disable freshness policies when using static partitions
return None

@dbt_assets(
manifest=test_jaffle_shop_manifest,
partitions_def=partitions_def,
backfill_policy=backfill_policy,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(): ...

assert my_dbt_assets.backfill_policy == expected_backfill_policy


Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole test is no longer needed as there is no special backfill policy logic in @dbt_assets anymore.

@pytest.mark.parametrize(
"retry_policy",
[
Expand Down Expand Up @@ -1025,3 +971,19 @@ def my_dbt_assets(): ...
AssetKey(["customers"]),
AssetKey(["orders"]),
}


def test_partitioned_dbt_asset_with_regular_partitioned_asset_in_asset_job(
test_jaffle_shop_manifest: Dict[str, Any],
) -> None:
partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")

@asset(partitions_def=partitions_def)
def foo() -> int: ...

@dbt_assets(partitions_def=partitions_def, manifest=test_jaffle_shop_manifest)
def my_dbt_assets(): ...

asset_job = define_asset_job("asset_job", [foo, my_dbt_assets])

assert Definitions(assets=[foo, my_dbt_assets], jobs=[asset_job]).get_job_def("asset_job")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed this test was failing before the changes.