From 3ce1bf2cfa5ffe800ea19811b48ea069ea2c3319 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 06:50:47 -0400 Subject: [PATCH] Wrap observable source assets in AssetsDefinition --- .../dagster/_core/definitions/events.py | 6 ++- .../_core/definitions/observable_asset.py | 40 ++++++++++++------- .../dagster/_core/definitions/source_asset.py | 16 ++++---- .../_core/execution/execution_result.py | 3 ++ .../test_observable_assets.py | 32 ++++++++++++--- 5 files changed, 69 insertions(+), 28 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/events.py b/python_modules/dagster/dagster/_core/definitions/events.py index 04333178b4c3e..0f266e31744b0 100644 --- a/python_modules/dagster/dagster/_core/definitions/events.py +++ b/python_modules/dagster/dagster/_core/definitions/events.py @@ -19,7 +19,7 @@ import dagster._check as check import dagster._seven as seven from dagster._annotations import PublicAttr, experimental_param, public -from dagster._core.definitions.data_version import DataVersion +from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX from dagster._serdes import whitelist_for_serdes from dagster._serdes.serdes import NamedTupleSerializer @@ -481,6 +481,10 @@ def __new__( def label(self) -> str: return " ".join(self.asset_key.path) + @property + def data_version(self) -> Optional[str]: + return self.tags.get(DATA_VERSION_TAG) + UNDEFINED_ASSET_KEY_PATH = ["__undefined__"] diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index b9cf41bb00ed2..4aa922db3dc27 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -7,8 +7,12 @@ AssetVarietal, ) from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset -from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.source_asset import ( + SourceAsset, + wrap_source_asset_observe_fn_in_op_compute_fn, +) from dagster._core.errors import DagsterInvariantViolationError +from dagster._core.execution.context.compute import OpExecutionContext def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): @@ -49,25 +53,24 @@ def an_asset() -> None: return an_asset -def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset): - check.invariant( - source_asset.observe_fn is None, - "Observable source assets not supported yet: observe_fn should be None", - ) - check.invariant( - source_asset.partitions_def is None, - "Observable source assets not supported yet: partitions_def should be None", - ) +def create_assets_def_from_source_asset(source_asset: SourceAsset): check.invariant( source_asset.auto_observe_interval_minutes is None, - "Observable source assets not supported yet: auto_observe_interval_minutes should be None", + "Schedulable observable source assets not supported yet: auto_observe_interval_minutes" + " should be None", + ) + + injected_metadata = ( + {SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value} + if source_asset.observe_fn is None + else {} ) kwargs = { "key": source_asset.key, "metadata": { **source_asset.metadata, - **{SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value}, + **injected_metadata, }, "group_name": source_asset.group_name, "description": source_asset.description, @@ -78,8 +81,17 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So elif source_asset.io_manager_key: kwargs["io_manager_key"] = source_asset.io_manager_key + kwargs["partitions_def"] = source_asset.partitions_def + + if source_asset.observe_fn: + kwargs["resource_defs"] = source_asset.resource_defs + @asset(**kwargs) - def shim_asset() -> None: - raise NotImplementedError(f"Asset {source_asset.key} is not executable") + def shim_asset(context: OpExecutionContext) -> None: + if not source_asset.observe_fn: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) + return op_function.decorated_fn(context) return shim_asset diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 6ebf646f48766..01b9d5bab4502 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -9,7 +9,7 @@ cast, ) -from typing_extensions import TYPE_CHECKING, TypeAlias +from typing_extensions import TypeAlias import dagster._check as check from dagster._annotations import PublicAttr, experimental_param, public @@ -50,19 +50,19 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import disable_dagster_warnings -if TYPE_CHECKING: - from dagster._core.definitions.decorators.op_decorator import ( - DecoratedOpFunction, - ) +# if TYPE_CHECKING: +# from dagster._core.definitions.decorators.op_decorator import ( +# DecoratedOpFunction, +# ) # Going with this catch-all for the time-being to permit pythonic resources SourceAssetObserveFunction: TypeAlias = Callable[..., Any] @staticmethod -def wrap_source_asset_observe_fn_in_op_compute_fn( - source_asset: "SourceAsset", -) -> "DecoratedOpFunction": +def wrap_source_asset_observe_fn_in_op_compute_fn(source_asset: "SourceAsset"): + # TODO getting errant TCH001 error from ruff + # -> "DecoratedOpFunction": from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, is_context_provided, diff --git a/python_modules/dagster/dagster/_core/execution/execution_result.py b/python_modules/dagster/dagster/_core/execution/execution_result.py index f38408282b601..e54891520767b 100644 --- a/python_modules/dagster/dagster/_core/execution/execution_result.py +++ b/python_modules/dagster/dagster/_core/execution/execution_result.py @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati def get_asset_materialization_events(self) -> Sequence[DagsterEvent]: return [event for event in self.all_events if event.is_step_materialization] + def get_asset_observation_events(self) -> Sequence[DagsterEvent]: + return [event for event in self.all_events if event.is_asset_observation] + def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]: return [ cast(AssetCheckEvaluation, event.event_specific_data) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 32580ec62a05a..a2db50b8e52c6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -4,17 +4,19 @@ AssetsDefinition, AutoMaterializePolicy, DagsterInstance, + DataVersion, Definitions, IOManager, SourceAsset, _check as check, asset, + observable_source_asset, ) from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.observable_asset import ( + create_assets_def_from_source_asset, create_unexecutable_observable_assets_def, - create_unexecutable_observable_assets_def_from_source_asset, ) @@ -107,7 +109,7 @@ def an_asset(source_asset: str) -> str: assert result_one.output_for_node("an_asset") == "hardcoded-computed" defs_with_shim = Definitions( - assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + assets=[create_assets_def_from_source_asset(source_asset), an_asset] ) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) @@ -115,10 +117,30 @@ def an_asset(source_asset: str) -> str: result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( instance=instance, # currently we have to explicitly select the asset to exclude the source from execution - asset_selection=[ - AssetKey("an_asset") - ], + asset_selection=[AssetKey("an_asset")], ) assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed" + + +def test_observable_source_asset_decorator() -> None: + @observable_source_asset + def an_observable_source_asset() -> DataVersion: + return DataVersion("foo") + + defs = Definitions(assets=[create_assets_def_from_source_asset(an_observable_source_asset)]) + + result = defs.get_implicit_global_asset_job_def().execute_in_process() + + assert result.success + + all_observations = result.get_asset_observation_events() + assert len(all_observations) == 1 + observation_event = all_observations[0] + assert observation_event.asset_observation_data.asset_observation.data_version == "foo" + + all_materializations = result.get_asset_materialization_events() + # Note this does not make sense. We need to make framework changes to allow for the omission of + # a materialzation event + assert len(all_materializations) == 1