From 3211da432b89b4764e872153ccb8ac5e8c588943 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 09:56:44 -0400 Subject: [PATCH] Add a new AssetExecutionType (Observable) and use it to omit materializations for observable source asset wrapping more bulletproof codepath defend against non-existent asset_defs better better still f-string --- .../dagster/_core/definitions/asset_spec.py | 6 ++- .../dagster/_core/definitions/assets.py | 11 +++++ .../_core/definitions/external_asset.py | 8 +++- .../_core/execution/plan/execute_step.py | 42 +++++++++++++++---- .../definitions_tests/test_external_assets.py | 26 ++++++++++++ 5 files changed, 82 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 1b8e1cf85a26f..f7e36ffffb489 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -26,12 +26,16 @@ class AssetExecutionType(Enum): + OBSERVATION = "OBSERVATION" UNEXECUTABLE = "UNEXECUTABLE" MATERIALIZATION = "MATERIALIZATION" @staticmethod def is_executable(varietal_str: Optional[str]) -> bool: - return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION} + return AssetExecutionType.str_to_enum(varietal_str) in { + AssetExecutionType.MATERIALIZATION, + AssetExecutionType.OBSERVATION, + } @staticmethod def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType": diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 7d61a5fede293..40a507f544bd2 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -21,6 +21,7 @@ from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType from dagster._core.definitions.freshness_policy import FreshnessPolicy @@ -905,6 +906,16 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool: self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) ) + def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: + from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + ) + + return AssetExecutionType.str_to_enum( + self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) + ) + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 03891ea5f69a7..a4b47d9159d40 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -132,11 +132,17 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets "Observable source assets not supported yet: auto_observe_interval_minutes should be None", ) + injected_metadata = ( + {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} + if source_asset.observe_fn is None + else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} + ) + kwargs = { "key": source_asset.key, "metadata": { **source_asset.metadata, - **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + **injected_metadata, }, "group_name": source_asset.group_name, "description": source_asset.description, diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 26cfc3a7dba7f..b42de69bfa2fc 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -25,6 +25,7 @@ TypeCheck, ) from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import ( CODE_VERSION_TAG, DATA_VERSION_IS_USER_PROVIDED_TAG, @@ -779,15 +780,38 @@ def _gen_fn(): asset_key, partitions = _asset_key_and_partitions_for_output(output_context) if asset_key: - for materialization in _get_output_asset_materializations( - asset_key, - partitions, - output, - output_def, - manager_metadata, - step_context, - ): - yield DagsterEvent.asset_materialization(step_context, materialization) + asset_layer = step_context.job_def.asset_layer + execution_type = ( + asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) + if asset_layer.has_assets_def_for_asset(asset_key) + else AssetExecutionType.MATERIALIZATION + ) + + check.invariant( + execution_type != AssetExecutionType.UNEXECUTABLE, + "There should never be unexecutable assets here", + ) + + check.invariant( + execution_type in {AssetExecutionType.MATERIALIZATION, AssetExecutionType.OBSERVATION}, + f"Unexpected asset execution type {execution_type}", + ) + + yield from ( + ( + DagsterEvent.asset_materialization(step_context, materialization) + for materialization in _get_output_asset_materializations( + asset_key, + partitions, + output, + output_def, + manager_metadata, + step_context, + ) + ) + if execution_type == AssetExecutionType.MATERIALIZATION + else () + ) yield DagsterEvent.handled_output( step_context, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index fe55c96b815cf..8a25944b687b2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -2,6 +2,8 @@ import pytest from dagster import ( + observable_source_asset, + DataVersion, AssetExecutionContext, AssetKey, AssetsDefinition, @@ -202,3 +204,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03" + + +def test_observable_source_asset_decorator() -> None: + @observable_source_asset + def an_observable_source_asset() -> DataVersion: + return DataVersion("foo") + + assets_def = create_external_asset_from_source_asset(an_observable_source_asset) + assert assets_def.is_asset_executable(an_observable_source_asset.key) + defs = Definitions(assets=[assets_def]) + + instance = DagsterInstance.ephemeral() + result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) + + assert result.success + assert result.output_for_node("an_observable_source_asset") is None + + all_observations = result.get_asset_observation_events() + assert len(all_observations) == 1 + observation_event = all_observations[0] + assert observation_event.asset_observation_data.asset_observation.data_version == "foo" + + all_materializations = result.get_asset_materialization_events() + assert len(all_materializations) == 0