From b03b35758228be27ffb1c2112769c23f1c24eed6 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 09:56:44 -0400 Subject: [PATCH] Add a new AssetExecutionType (Observable) and use it to omit materializations for observable source asset wrapping more bulletproof codepath defend against non-existent asset_defs better better still f-string --- .../dagster/_core/definitions/asset_spec.py | 6 ++- .../dagster/_core/definitions/assets.py | 11 +++++ .../_core/definitions/observable_asset.py | 2 +- .../_core/execution/plan/execute_step.py | 42 +++++++++++++++---- .../test_observable_assets.py | 8 ++-- 5 files changed, 54 insertions(+), 15 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 1b8e1cf85a26f..f7e36ffffb489 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -26,12 +26,16 @@ class AssetExecutionType(Enum): + OBSERVATION = "OBSERVATION" UNEXECUTABLE = "UNEXECUTABLE" MATERIALIZATION = "MATERIALIZATION" @staticmethod def is_executable(varietal_str: Optional[str]) -> bool: - return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION} + return AssetExecutionType.str_to_enum(varietal_str) in { + AssetExecutionType.MATERIALIZATION, + AssetExecutionType.OBSERVATION, + } @staticmethod def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType": diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index a0bc0d0d4f0f3..59533edf4d835 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -21,6 +21,7 @@ from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckHandle, AssetCheckSpec from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType from dagster._core.definitions.freshness_policy import FreshnessPolicy @@ -876,6 +877,16 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool: self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) ) + def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: + from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + ) + + return AssetExecutionType.str_to_enum( + self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) + ) + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index 4ca4341d7c3e0..b6d159d38e168 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -67,7 +67,7 @@ def create_assets_def_from_source_asset(source_asset: SourceAsset): injected_metadata = ( {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} if source_asset.observe_fn is None - else {} + else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} ) kwargs = { diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 2552d5f61b8d1..407e44cc40ceb 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -25,6 +25,7 @@ TypeCheck, ) from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import ( CODE_VERSION_TAG, DATA_VERSION_IS_USER_PROVIDED_TAG, @@ -779,15 +780,38 @@ def _gen_fn(): asset_key, partitions = _asset_key_and_partitions_for_output(output_context) if asset_key: - for materialization in _get_output_asset_materializations( - asset_key, - partitions, - output, - output_def, - manager_metadata, - step_context, - ): - yield DagsterEvent.asset_materialization(step_context, materialization) + asset_layer = step_context.job_def.asset_layer + execution_type = ( + asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) + if asset_layer.has_assets_def_for_asset(asset_key) + else AssetExecutionType.MATERIALIZATION + ) + + check.invariant( + execution_type != AssetExecutionType.UNEXECUTABLE, + "There should never be unexecutable assets here", + ) + + check.invariant( + execution_type in {AssetExecutionType.MATERIALIZATION, AssetExecutionType.OBSERVATION}, + f"Unexpected asset execution type {execution_type}", + ) + + yield from ( + ( + DagsterEvent.asset_materialization(step_context, materialization) + for materialization in _get_output_asset_materializations( + asset_key, + partitions, + output, + output_def, + manager_metadata, + step_context, + ) + ) + if execution_type == AssetExecutionType.MATERIALIZATION + else () + ) yield DagsterEvent.handled_output( step_context, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 1582fd027a752..ce4fc7b4f095b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -202,7 +202,9 @@ def test_observable_source_asset_decorator() -> None: def an_observable_source_asset() -> DataVersion: return DataVersion("foo") - defs = Definitions(assets=[create_assets_def_from_source_asset(an_observable_source_asset)]) + assets_def = create_assets_def_from_source_asset(an_observable_source_asset) + assert assets_def.is_asset_executable(an_observable_source_asset.key) + defs = Definitions(assets=[assets_def]) instance = DagsterInstance.ephemeral() result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) @@ -216,6 +218,4 @@ def an_observable_source_asset() -> DataVersion: assert observation_event.asset_observation_data.asset_observation.data_version == "foo" all_materializations = result.get_asset_materialization_events() - # Note this does not make sense. We need to make framework changes to allow for the omission of - # a materialzation event - assert len(all_materializations) == 1 + assert len(all_materializations) == 0