diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 2407e28221e9e..9fa1d6cb3b140 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -32,7 +32,7 @@ AssetExecutionType, ) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy -from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType +from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.graph_definition import SubselectedGraphDefinition from dagster._core.definitions.metadata import ArbitraryMetadataMapping @@ -195,18 +195,6 @@ def __init__( backfill_policy, "backfill_policy", BackfillPolicy ) - if self._partitions_def is None: - # check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned - check.param_invariant( - ( - backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN - if backfill_policy - else True - ), - "backfill_policy", - "Non partitioned asset can only have single run backfill policy", - ) - self._is_subset = check.bool_param(is_subset, "is_subset") if specs is not None: @@ -952,7 +940,9 @@ def _get_external_asset_metadata_value(self, metadata_key: str) -> object: @property def backfill_policy(self) -> Optional[BackfillPolicy]: - return self._backfill_policy + return self._backfill_policy or ( + DEFAULT_BACKFILL_POLICY if self.is_materializable else None + ) @public @property diff --git a/python_modules/dagster/dagster/_core/definitions/backfill_policy.py b/python_modules/dagster/dagster/_core/definitions/backfill_policy.py index 851fdf03fcca4..1de0df80abb3d 100644 --- a/python_modules/dagster/dagster/_core/definitions/backfill_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/backfill_policy.py @@ -90,20 +90,16 @@ def __str__(self): ) +with disable_dagster_warnings(): + DEFAULT_BACKFILL_POLICY = BackfillPolicy.multi_run(1) + + # In situations where multiple backfill policies are specified, call this to resolve a canonical # policy, which is the policy with the minimum max_partitions_per_run. def resolve_backfill_policy( - backfill_policies: Iterable[Optional[BackfillPolicy]], + backfill_policies: Iterable[BackfillPolicy], ) -> BackfillPolicy: - policy = next(iter(sorted(backfill_policies, key=_backfill_policy_sort_key)), None) - with disable_dagster_warnings(): - return policy or BackfillPolicy.multi_run(1) - - -def _backfill_policy_sort_key(bp: Optional[BackfillPolicy]) -> float: - if bp is None: # equivalent to max_partitions_per_run=1 - return 1 - elif bp.max_partitions_per_run is None: - return float("inf") - else: - return bp.max_partitions_per_run + return next( + iter(sorted(backfill_policies, key=lambda bp: bp.max_partitions_per_run or float("inf"))), + DEFAULT_BACKFILL_POLICY, + ) diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 0c86b47a8145a..7e8d84b3c8070 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -186,7 +186,12 @@ def resolve( # Require that all assets in the job have the same backfill policy executable_nodes = {job_asset_graph.get(k) for k in job_asset_graph.executable_asset_keys} nodes_by_backfill_policy = dict( - groupby((n for n in executable_nodes if n.is_partitioned), lambda n: n.backfill_policy) + groupby( + (n for n in executable_nodes if n.is_partitioned), + lambda n: check.not_none( + n.backfill_policy, f"Unexpected null backfill policy for {n.key}" + ), + ) ) backfill_policy = resolve_backfill_policy(nodes_by_backfill_policy.keys()) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index a690f5b61418e..c6ccac815f7d1 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -24,10 +24,7 @@ import pendulum import dagster._check as check -from dagster._core.definitions.asset_daemon_context import ( - build_run_requests, - build_run_requests_with_backfill_policies, -) +from dagster._core.definitions.asset_daemon_context import build_run_requests_with_backfill_policies from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_selection import KeysAssetSelection from dagster._core.definitions.base_asset_graph import BaseAssetGraph @@ -1310,38 +1307,12 @@ def execute_asset_backfill_iteration_inner( f"The following assets were considered for materialization but not requested: {not_requested_str}" ) - # check if all assets have backfill policies if any of them do, otherwise, raise error - asset_backfill_policies = [ - asset_graph.get(asset_key).backfill_policy - for asset_key in { - asset_partition.asset_key for asset_partition in asset_partitions_to_request - } - ] - all_assets_have_backfill_policies = all( - backfill_policy is not None for backfill_policy in asset_backfill_policies + run_requests = build_run_requests_with_backfill_policies( + asset_partitions=asset_partitions_to_request, + asset_graph=asset_graph, + run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, + dynamic_partitions_store=instance_queryer, ) - if all_assets_have_backfill_policies: - run_requests = build_run_requests_with_backfill_policies( - asset_partitions=asset_partitions_to_request, - asset_graph=asset_graph, - run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, - dynamic_partitions_store=instance_queryer, - ) - else: - if not all(backfill_policy is None for backfill_policy in asset_backfill_policies): - # if some assets have backfill policies, but not all of them, raise error - raise DagsterBackfillFailedError( - "Either all assets must have backfill policies or none of them must have backfill" - " policies. To backfill these assets together, either add backfill policies to all" - " assets, or remove backfill policies from all assets." - ) - # When any of the assets do not have backfill policies, we fall back to the default behavior of - # backfilling them partition by partition. - run_requests = build_run_requests( - asset_partitions=asset_partitions_to_request, - asset_graph=asset_graph, - run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, - ) if request_roots: check.invariant( diff --git a/python_modules/dagster/dagster/_core/remote_representation/external_data.py b/python_modules/dagster/dagster/_core/remote_representation/external_data.py index c801b1dcac334..c72462d6367b7 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external_data.py @@ -58,7 +58,7 @@ from dagster._core.definitions.auto_materialize_sensor_definition import ( AutoMaterializeSensorDefinition, ) -from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy from dagster._core.definitions.definition_config_schema import ConfiguredDefinitionConfigSchema from dagster._core.definitions.dependency import ( GraphNode, @@ -1419,6 +1419,10 @@ def __new__( # job, and no source assets could be part of any job is_source = len(job_names or []) == 0 + # backcompat logic to assign default BackfillPolicy for materializable assets + if backfill_policy is None and execution_type == AssetExecutionType.MATERIALIZATION: + backfill_policy = DEFAULT_BACKFILL_POLICY + return super(ExternalAssetNode, cls).__new__( cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey),