diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index c71815892210c..830de5cb48590 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -32,7 +32,7 @@ AssetExecutionType, ) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy -from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType +from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.graph_definition import SubselectedGraphDefinition from dagster._core.definitions.metadata import ArbitraryMetadataMapping @@ -195,18 +195,6 @@ def __init__( backfill_policy, "backfill_policy", BackfillPolicy ) - if self._partitions_def is None: - # check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned - check.param_invariant( - ( - backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN - if backfill_policy - else True - ), - "backfill_policy", - "Non partitioned asset can only have single run backfill policy", - ) - self._is_subset = check.bool_param(is_subset, "is_subset") if specs is not None: @@ -952,7 +940,7 @@ def _get_external_asset_metadata_value(self, metadata_key: str) -> object: @property def backfill_policy(self) -> Optional[BackfillPolicy]: - return self._backfill_policy + return self._backfill_policy or (DEFAULT_BACKFILL_POLICY if self.is_executable else None) @public @property diff --git a/python_modules/dagster/dagster/_core/definitions/backfill_policy.py b/python_modules/dagster/dagster/_core/definitions/backfill_policy.py index 851fdf03fcca4..1de0df80abb3d 100644 --- a/python_modules/dagster/dagster/_core/definitions/backfill_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/backfill_policy.py @@ -90,20 +90,16 @@ def __str__(self): ) +with disable_dagster_warnings(): + DEFAULT_BACKFILL_POLICY = BackfillPolicy.multi_run(1) + + # In situations where multiple backfill policies are specified, call this to resolve a canonical # policy, which is the policy with the minimum max_partitions_per_run. def resolve_backfill_policy( - backfill_policies: Iterable[Optional[BackfillPolicy]], + backfill_policies: Iterable[BackfillPolicy], ) -> BackfillPolicy: - policy = next(iter(sorted(backfill_policies, key=_backfill_policy_sort_key)), None) - with disable_dagster_warnings(): - return policy or BackfillPolicy.multi_run(1) - - -def _backfill_policy_sort_key(bp: Optional[BackfillPolicy]) -> float: - if bp is None: # equivalent to max_partitions_per_run=1 - return 1 - elif bp.max_partitions_per_run is None: - return float("inf") - else: - return bp.max_partitions_per_run + return next( + iter(sorted(backfill_policies, key=lambda bp: bp.max_partitions_per_run or float("inf"))), + DEFAULT_BACKFILL_POLICY, + ) diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 0c86b47a8145a..7e8d84b3c8070 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -186,7 +186,12 @@ def resolve( # Require that all assets in the job have the same backfill policy executable_nodes = {job_asset_graph.get(k) for k in job_asset_graph.executable_asset_keys} nodes_by_backfill_policy = dict( - groupby((n for n in executable_nodes if n.is_partitioned), lambda n: n.backfill_policy) + groupby( + (n for n in executable_nodes if n.is_partitioned), + lambda n: check.not_none( + n.backfill_policy, f"Unexpected null backfill policy for {n.key}" + ), + ) ) backfill_policy = resolve_backfill_policy(nodes_by_backfill_policy.keys()) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index a690f5b61418e..c6ccac815f7d1 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -24,10 +24,7 @@ import pendulum import dagster._check as check -from dagster._core.definitions.asset_daemon_context import ( - build_run_requests, - build_run_requests_with_backfill_policies, -) +from dagster._core.definitions.asset_daemon_context import build_run_requests_with_backfill_policies from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_selection import KeysAssetSelection from dagster._core.definitions.base_asset_graph import BaseAssetGraph @@ -1310,38 +1307,12 @@ def execute_asset_backfill_iteration_inner( f"The following assets were considered for materialization but not requested: {not_requested_str}" ) - # check if all assets have backfill policies if any of them do, otherwise, raise error - asset_backfill_policies = [ - asset_graph.get(asset_key).backfill_policy - for asset_key in { - asset_partition.asset_key for asset_partition in asset_partitions_to_request - } - ] - all_assets_have_backfill_policies = all( - backfill_policy is not None for backfill_policy in asset_backfill_policies + run_requests = build_run_requests_with_backfill_policies( + asset_partitions=asset_partitions_to_request, + asset_graph=asset_graph, + run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, + dynamic_partitions_store=instance_queryer, ) - if all_assets_have_backfill_policies: - run_requests = build_run_requests_with_backfill_policies( - asset_partitions=asset_partitions_to_request, - asset_graph=asset_graph, - run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, - dynamic_partitions_store=instance_queryer, - ) - else: - if not all(backfill_policy is None for backfill_policy in asset_backfill_policies): - # if some assets have backfill policies, but not all of them, raise error - raise DagsterBackfillFailedError( - "Either all assets must have backfill policies or none of them must have backfill" - " policies. To backfill these assets together, either add backfill policies to all" - " assets, or remove backfill policies from all assets." - ) - # When any of the assets do not have backfill policies, we fall back to the default behavior of - # backfilling them partition by partition. - run_requests = build_run_requests( - asset_partitions=asset_partitions_to_request, - asset_graph=asset_graph, - run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id}, - ) if request_roots: check.invariant( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 01a4717655680..7d65692fa1ba3 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1363,21 +1363,17 @@ def _log_materialization_planned_event_for_asset( from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent + is_partitioned = check.not_none(output.properties).is_asset_partitioned partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG) partition_range_start, partition_range_end = ( dagster_run.tags.get(ASSET_PARTITION_RANGE_START_TAG), dagster_run.tags.get(ASSET_PARTITION_RANGE_END_TAG), ) - if partition_tag and (partition_range_start or partition_range_end): - raise DagsterInvariantViolationError( - f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or" - f" {ASSET_PARTITION_RANGE_END_TAG} set along with" - f" {PARTITION_NAME_TAG}" - ) - - partitions_subset = None - if partition_range_start or partition_range_end: + if is_partitioned and partition_tag: + partition = partition_tag + partitions_subset = None + elif is_partitioned and (partition_range_start or partition_range_end): if not partition_range_start or not partition_range_end: raise DagsterInvariantViolationError( f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or" @@ -1398,17 +1394,17 @@ def _log_materialization_planned_event_for_asset( "Creating a run targeting a partition range is not supported for jobs partitioned with function-based dynamic partitions" ) - if check.not_none(output.properties).is_asset_partitioned: - partitions_subset = job_partitions_def.subset_with_partition_keys( - job_partitions_def.get_partition_keys_in_range( - PartitionKeyRange(partition_range_start, partition_range_end), - dynamic_partitions_store=self, - ) - ).to_serializable_subset() + partition = None + partitions_subset = job_partitions_def.subset_with_partition_keys( + job_partitions_def.get_partition_keys_in_range( + PartitionKeyRange(partition_range_start, partition_range_end), + dynamic_partitions_store=self, + ) + ).to_serializable_subset() + else: + partition = None + partitions_subset = None - partition = ( - partition_tag if check.not_none(output.properties).is_asset_partitioned else None - ) materialization_planned = DagsterEvent.build_asset_materialization_planned_event( job_name, step.key, diff --git a/python_modules/dagster/dagster/_core/remote_representation/external_data.py b/python_modules/dagster/dagster/_core/remote_representation/external_data.py index c801b1dcac334..c72462d6367b7 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external_data.py @@ -58,7 +58,7 @@ from dagster._core.definitions.auto_materialize_sensor_definition import ( AutoMaterializeSensorDefinition, ) -from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy from dagster._core.definitions.definition_config_schema import ConfiguredDefinitionConfigSchema from dagster._core.definitions.dependency import ( GraphNode, @@ -1419,6 +1419,10 @@ def __new__( # job, and no source assets could be part of any job is_source = len(job_names or []) == 0 + # backcompat logic to assign default BackfillPolicy for materializable assets + if backfill_policy is None and execution_type == AssetExecutionType.MATERIALIZATION: + backfill_policy = DEFAULT_BACKFILL_POLICY + return super(ExternalAssetNode, cls).__new__( cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey), diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py index c38656c21e587..853028f5710f4 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py @@ -15,7 +15,6 @@ asset, ) from dagster._core.definitions.partition import StaticPartitionsDefinition -from dagster._core.errors import DagsterBackfillFailedError from dagster._core.execution.asset_backfill import AssetBackfillData, AssetBackfillStatus from dagster._core.instance_for_test import instance_for_test from dagster._core.storage.tags import ( @@ -37,53 +36,6 @@ ) -def test_asset_backfill_not_all_asset_have_backfill_policy(): - @asset(backfill_policy=None) - def unpartitioned_upstream_of_partitioned(): - return 1 - - @asset( - partitions_def=DailyPartitionsDefinition("2023-01-01"), - backfill_policy=BackfillPolicy.single_run(), - ) - def upstream_daily_partitioned_asset(): - return 1 - - assets_by_repo_name = { - "repo": [ - unpartitioned_upstream_of_partitioned, - upstream_daily_partitioned_asset, - ] - } - asset_graph = get_asset_graph(assets_by_repo_name) - - backfill_data = AssetBackfillData.from_asset_partitions( - partition_names=None, - asset_graph=asset_graph, - asset_selection=[ - unpartitioned_upstream_of_partitioned.key, - upstream_daily_partitioned_asset.key, - ], - dynamic_partitions_store=MagicMock(), - all_partitions=True, - backfill_start_timestamp=get_current_timestamp(), - ) - - with pytest.raises( - DagsterBackfillFailedError, - match=( - "Either all assets must have backfill policies or none of them must have backfill" - " policies" - ), - ): - execute_asset_backfill_iteration_consume_generator( - backfill_id="test_backfill_id", - asset_backfill_data=backfill_data, - asset_graph=asset_graph, - instance=DagsterInstance.ephemeral(), - ) - - def test_asset_backfill_parent_and_children_have_different_backfill_policy(): time_now = get_current_datetime() daily_partitions_def: DailyPartitionsDefinition = DailyPartitionsDefinition("2023-01-01")