From 224c58fee607030ed093b3567fdbcac1f10fc2b0 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 06:50:47 -0400 Subject: [PATCH 1/5] Wrap observable source assets in AssetsDefinition cp explicit use ephem instnace --- .../dagster/_core/definitions/events.py | 6 +- .../_core/definitions/external_asset.py | 39 +++++-- .../_core/definitions/observable_asset.py | 107 ++++++++++++++++++ .../dagster/_core/definitions/source_asset.py | 2 +- .../_core/execution/execution_result.py | 3 + .../definitions_tests/test_external_assets.py | 26 +++++ 6 files changed, 169 insertions(+), 14 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/observable_asset.py diff --git a/python_modules/dagster/dagster/_core/definitions/events.py b/python_modules/dagster/dagster/_core/definitions/events.py index 7c21cc20025ab..d6e5b3644c91a 100644 --- a/python_modules/dagster/dagster/_core/definitions/events.py +++ b/python_modules/dagster/dagster/_core/definitions/events.py @@ -19,7 +19,7 @@ import dagster._check as check import dagster._seven as seven from dagster._annotations import PublicAttr, deprecated, experimental_param, public -from dagster._core.definitions.data_version import DataVersion +from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX from dagster._serdes import whitelist_for_serdes from dagster._serdes.serdes import NamedTupleSerializer @@ -481,6 +481,10 @@ def __new__( def label(self) -> str: return " ".join(self.asset_key.path) + @property + def data_version(self) -> Optional[str]: + return self.tags.get(DATA_VERSION_TAG) + UNDEFINED_ASSET_KEY_PATH = ["__undefined__"] diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 03891ea5f69a7..1f813fb63a1a4 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -8,7 +8,10 @@ ) from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset -from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.source_asset import ( + SourceAsset, + wrap_source_asset_observe_fn_in_op_compute_fn, +) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.context.compute import AssetExecutionContext @@ -123,20 +126,23 @@ def _external_assets_def(context: AssetExecutionContext) -> None: def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: - check.invariant( - source_asset.observe_fn is None, - "Observable source assets not supported yet: observe_fn should be None", - ) check.invariant( source_asset.auto_observe_interval_minutes is None, - "Observable source assets not supported yet: auto_observe_interval_minutes should be None", + "Automatically observed external assets not supported yet: auto_observe_interval_minutes" + " should be None", + ) + + injected_metadata = ( + {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} + if source_asset.observe_fn is None + else {} ) kwargs = { "key": source_asset.key, "metadata": { **source_asset.metadata, - **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + **injected_metadata, }, "group_name": source_asset.group_name, "description": source_asset.description, @@ -149,10 +155,19 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets kwargs["io_manager_key"] = source_asset.io_manager_key @asset(**kwargs) - def _external_assets_def() -> None: - raise NotImplementedError(f"Asset {source_asset.key} is not executable") + def _shim_assets_def(context: AssetExecutionContext): + if not source_asset.observe_fn: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) + return_value = op_function.decorated_fn(context) + check.invariant( + return_value is None, + "The wrapped decorated_fn should return a value. If this changes, this code path must" + " changed to process the events appopriately.", + ) - check.invariant(isinstance(_external_assets_def, AssetsDefinition)) - assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright + check.invariant(isinstance(_shim_assets_def, AssetsDefinition)) + assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright - return _external_assets_def + return _shim_assets_def diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py new file mode 100644 index 0000000000000..1c0a97c56f330 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -0,0 +1,107 @@ +# from typing import Sequence + +# from dagster import _check as check +# from dagster._core.definitions.asset_spec import ( +# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, +# AssetExecutionType, +# AssetSpec, +# ) +# from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset +# from dagster._core.definitions.source_asset import ( +# SourceAsset, +# wrap_source_asset_observe_fn_in_op_compute_fn, +# ) +# from dagster._core.errors import DagsterInvariantViolationError +# from dagster._core.execution.context.compute import OpExecutionContext + + +# def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): +# new_specs = [] +# for spec in specs: +# check.invariant( +# spec.auto_materialize_policy is None, +# "auto_materialize_policy must be None since it is ignored", +# ) +# check.invariant(spec.code_version is None, "code_version must be None since it is ignored") +# check.invariant( +# spec.freshness_policy is None, "freshness_policy must be None since it is ignored" +# ) +# check.invariant( +# spec.skippable is False, +# "skippable must be False since it is ignored and False is the default", +# ) + +# new_specs.append( +# AssetSpec( +# key=spec.key, +# description=spec.description, +# group_name=spec.group_name, +# metadata={ +# **(spec.metadata or {}), +# **{ +# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( +# AssetExecutionType.UNEXECUTABLE.value +# ) +# }, +# }, +# deps=spec.deps, +# ) +# ) + +# @multi_asset(specs=new_specs) +# def an_asset() -> None: +# raise DagsterInvariantViolationError( +# f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}" +# ) + +# return an_asset + + +# def create_assets_def_from_source_asset(source_asset: SourceAsset): +# check.invariant( +# source_asset.auto_observe_interval_minutes is None, +# "Schedulable observable source assets not supported yet: auto_observe_interval_minutes" +# " should be None", +# ) + +# injected_metadata = ( +# {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} +# if source_asset.observe_fn is None +# else {} +# ) + +# kwargs = { +# "key": source_asset.key, +# "metadata": { +# **source_asset.metadata, +# **injected_metadata, +# }, +# "group_name": source_asset.group_name, +# "description": source_asset.description, +# "partitions_def": source_asset.partitions_def, +# } + +# if source_asset.io_manager_def: +# kwargs["io_manager_def"] = source_asset.io_manager_def +# elif source_asset.io_manager_key: +# kwargs["io_manager_key"] = source_asset.io_manager_key + +# kwargs["partitions_def"] = source_asset.partitions_def + +# if source_asset.observe_fn: +# kwargs["resource_defs"] = source_asset.resource_defs + +# @asset(**kwargs) +# def shim_asset(context: OpExecutionContext): +# if not source_asset.observe_fn: +# raise NotImplementedError(f"Asset {source_asset.key} is not executable") + +# op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) +# return_value = op_function.decorated_fn(context) +# check.invariant( +# return_value is None, +# "The wrapped decorated_fn should return a value. If this changes, this code path must" +# " changed to process the events appopriately.", +# ) + +# return shim_asset diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 7457c0e2d9497..79f6ca213d571 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( observe_fn_has_context = is_context_provided(get_function_params(observe_fn)) - def fn(context: OpExecutionContext): + def fn(context: OpExecutionContext) -> None: resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)] resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys} observe_fn_return_value = ( diff --git a/python_modules/dagster/dagster/_core/execution/execution_result.py b/python_modules/dagster/dagster/_core/execution/execution_result.py index f38408282b601..e54891520767b 100644 --- a/python_modules/dagster/dagster/_core/execution/execution_result.py +++ b/python_modules/dagster/dagster/_core/execution/execution_result.py @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati def get_asset_materialization_events(self) -> Sequence[DagsterEvent]: return [event for event in self.all_events if event.is_step_materialization] + def get_asset_observation_events(self) -> Sequence[DagsterEvent]: + return [event for event in self.all_events if event.is_asset_observation] + def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]: return [ cast(AssetCheckEvaluation, event.event_specific_data) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index fe55c96b815cf..3a8470691c5d9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -7,12 +7,14 @@ AssetsDefinition, AutoMaterializePolicy, DagsterInstance, + DataVersion, Definitions, IOManager, JobDefinition, SourceAsset, _check as check, asset, + observable_source_asset, ) from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.external_asset import ( @@ -202,3 +204,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03" + + +def test_observable_source_asset_decorator() -> None: + @observable_source_asset + def an_observable_source_asset() -> DataVersion: + return DataVersion("foo") + + defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)]) + + instance = DagsterInstance.ephemeral() + result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) + + assert result.success + assert result.output_for_node("an_observable_source_asset") is None + + all_observations = result.get_asset_observation_events() + assert len(all_observations) == 1 + observation_event = all_observations[0] + assert observation_event.asset_observation_data.asset_observation.data_version == "foo" + + all_materializations = result.get_asset_materialization_events() + # Note this does not make sense. We need to make framework changes to allow for the omission of + # a materialzation event + assert len(all_materializations) == 1 From 1ecc61e83b10d4e15366f36b45d02c9d212d502d Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 6 Oct 2023 08:45:46 -0400 Subject: [PATCH 2/5] delete --- .../_core/definitions/observable_asset.py | 107 ------------------ 1 file changed, 107 deletions(-) delete mode 100644 python_modules/dagster/dagster/_core/definitions/observable_asset.py diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py deleted file mode 100644 index 1c0a97c56f330..0000000000000 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ /dev/null @@ -1,107 +0,0 @@ -# from typing import Sequence - -# from dagster import _check as check -# from dagster._core.definitions.asset_spec import ( -# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, -# AssetExecutionType, -# AssetSpec, -# ) -# from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset -# from dagster._core.definitions.source_asset import ( -# SourceAsset, -# wrap_source_asset_observe_fn_in_op_compute_fn, -# ) -# from dagster._core.errors import DagsterInvariantViolationError -# from dagster._core.execution.context.compute import OpExecutionContext - - -# def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): -# new_specs = [] -# for spec in specs: -# check.invariant( -# spec.auto_materialize_policy is None, -# "auto_materialize_policy must be None since it is ignored", -# ) -# check.invariant(spec.code_version is None, "code_version must be None since it is ignored") -# check.invariant( -# spec.freshness_policy is None, "freshness_policy must be None since it is ignored" -# ) -# check.invariant( -# spec.skippable is False, -# "skippable must be False since it is ignored and False is the default", -# ) - -# new_specs.append( -# AssetSpec( -# key=spec.key, -# description=spec.description, -# group_name=spec.group_name, -# metadata={ -# **(spec.metadata or {}), -# **{ -# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( -# AssetExecutionType.UNEXECUTABLE.value -# ) -# }, -# }, -# deps=spec.deps, -# ) -# ) - -# @multi_asset(specs=new_specs) -# def an_asset() -> None: -# raise DagsterInvariantViolationError( -# f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}" -# ) - -# return an_asset - - -# def create_assets_def_from_source_asset(source_asset: SourceAsset): -# check.invariant( -# source_asset.auto_observe_interval_minutes is None, -# "Schedulable observable source assets not supported yet: auto_observe_interval_minutes" -# " should be None", -# ) - -# injected_metadata = ( -# {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} -# if source_asset.observe_fn is None -# else {} -# ) - -# kwargs = { -# "key": source_asset.key, -# "metadata": { -# **source_asset.metadata, -# **injected_metadata, -# }, -# "group_name": source_asset.group_name, -# "description": source_asset.description, -# "partitions_def": source_asset.partitions_def, -# } - -# if source_asset.io_manager_def: -# kwargs["io_manager_def"] = source_asset.io_manager_def -# elif source_asset.io_manager_key: -# kwargs["io_manager_key"] = source_asset.io_manager_key - -# kwargs["partitions_def"] = source_asset.partitions_def - -# if source_asset.observe_fn: -# kwargs["resource_defs"] = source_asset.resource_defs - -# @asset(**kwargs) -# def shim_asset(context: OpExecutionContext): -# if not source_asset.observe_fn: -# raise NotImplementedError(f"Asset {source_asset.key} is not executable") - -# op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) -# return_value = op_function.decorated_fn(context) -# check.invariant( -# return_value is None, -# "The wrapped decorated_fn should return a value. If this changes, this code path must" -# " changed to process the events appopriately.", -# ) - -# return shim_asset From bfa044d87daa51f0793cadc9a9d1eb83a9068c24 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 6 Oct 2023 13:04:37 -0400 Subject: [PATCH 3/5] add playground --- examples/experimental/playground/README.md | 5 +++++ examples/experimental/playground/pyproject.toml | 6 ++++++ examples/experimental/playground/setup.cfg | 2 ++ examples/experimental/playground/setup.py | 17 +++++++++++++++++ examples/experimental/playground/tox.ini | 17 +++++++++++++++++ 5 files changed, 47 insertions(+) create mode 100644 examples/experimental/playground/README.md create mode 100644 examples/experimental/playground/pyproject.toml create mode 100644 examples/experimental/playground/setup.cfg create mode 100644 examples/experimental/playground/setup.py create mode 100644 examples/experimental/playground/tox.ini diff --git a/examples/experimental/playground/README.md b/examples/experimental/playground/README.md new file mode 100644 index 0000000000000..c3bf5531d9663 --- /dev/null +++ b/examples/experimental/playground/README.md @@ -0,0 +1,5 @@ +# Playground + +## Getting started + +A place to collect lightweight examples of Dagster usage. \ No newline at end of file diff --git a/examples/experimental/playground/pyproject.toml b/examples/experimental/playground/pyproject.toml new file mode 100644 index 0000000000000..86b7de1a61a38 --- /dev/null +++ b/examples/experimental/playground/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "playground" diff --git a/examples/experimental/playground/setup.cfg b/examples/experimental/playground/setup.cfg new file mode 100644 index 0000000000000..2c6903734bc69 --- /dev/null +++ b/examples/experimental/playground/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = playground diff --git a/examples/experimental/playground/setup.py b/examples/experimental/playground/setup.py new file mode 100644 index 0000000000000..ad7369a980ec2 --- /dev/null +++ b/examples/experimental/playground/setup.py @@ -0,0 +1,17 @@ +from setuptools import find_packages, setup + +setup( + name="playground", + packages=find_packages(exclude=["playground"]), + install_requires=["dagster"], + license="Apache-2.0", + description="A collection of examples to demonstrate Dagster features", + url="https://github.com/dagster-io/dagster/tree/master/examples/experimental/playground", + classifiers=[ + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + ], + extras_require={"dev": ["dagster-webserver", "pytest"]}, +) diff --git a/examples/experimental/playground/tox.ini b/examples/experimental/playground/tox.ini new file mode 100644 index 0000000000000..8d0031cffe458 --- /dev/null +++ b/examples/experimental/playground/tox.ini @@ -0,0 +1,17 @@ +[tox] +skipsdist = true + +[testenv] +download = True +passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE* +deps = + -e ../../../python_modules/dagster[test] + -e ../../../python_modules/dagster-pipes + -e ../../../python_modules/dagster-webserver + -e ../../../python_modules/dagster-graphql + -e . +allowlist_externals = + /bin/bash +commands = + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' + pytest -c ../../../pyproject.toml -vv From d616ef3e330b7d94b33947c3492b908bb709134d Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 6 Oct 2023 13:08:00 -0400 Subject: [PATCH 4/5] add --- examples/experimental/playground/playground/__init__.py | 1 + .../experimental/playground/playground_tests/__init__.py | 0 .../playground/playground_tests/test_playground_include.py | 5 +++++ 3 files changed, 6 insertions(+) create mode 100644 examples/experimental/playground/playground/__init__.py create mode 100644 examples/experimental/playground/playground_tests/__init__.py create mode 100644 examples/experimental/playground/playground_tests/test_playground_include.py diff --git a/examples/experimental/playground/playground/__init__.py b/examples/experimental/playground/playground/__init__.py new file mode 100644 index 0000000000000..2d788c430e0f7 --- /dev/null +++ b/examples/experimental/playground/playground/__init__.py @@ -0,0 +1 @@ +INCLUDE_TEST = "A_STRING" diff --git a/examples/experimental/playground/playground_tests/__init__.py b/examples/experimental/playground/playground_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/playground/playground_tests/test_playground_include.py b/examples/experimental/playground/playground_tests/test_playground_include.py new file mode 100644 index 0000000000000..7248100b8b5fd --- /dev/null +++ b/examples/experimental/playground/playground_tests/test_playground_include.py @@ -0,0 +1,5 @@ +from playground import INCLUDE_TEST + + +def test_include() -> None: + assert INCLUDE_TEST == "A_STRING" From dd6a0cd3bfeaf5eb58c30772394e7c0000730172 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 6 Oct 2023 13:31:58 -0400 Subject: [PATCH 5/5] example --- .../experimental/playground/playground/__init__.py | 1 - .../playground/observable_source_asset_adapter.py | 13 +++++++++++++ .../test_observable_source_asset_adapter.py | 8 ++++++++ .../playground_tests/test_playground_include.py | 5 ----- 4 files changed, 21 insertions(+), 6 deletions(-) create mode 100644 examples/experimental/playground/playground/observable_source_asset_adapter.py create mode 100644 examples/experimental/playground/playground_tests/test_observable_source_asset_adapter.py delete mode 100644 examples/experimental/playground/playground_tests/test_playground_include.py diff --git a/examples/experimental/playground/playground/__init__.py b/examples/experimental/playground/playground/__init__.py index 2d788c430e0f7..e69de29bb2d1d 100644 --- a/examples/experimental/playground/playground/__init__.py +++ b/examples/experimental/playground/playground/__init__.py @@ -1 +0,0 @@ -INCLUDE_TEST = "A_STRING" diff --git a/examples/experimental/playground/playground/observable_source_asset_adapter.py b/examples/experimental/playground/playground/observable_source_asset_adapter.py new file mode 100644 index 0000000000000..7c96b80ffd86b --- /dev/null +++ b/examples/experimental/playground/playground/observable_source_asset_adapter.py @@ -0,0 +1,13 @@ +from datetime import datetime + +from dagster import Definitions, observable_source_asset +from dagster._core.definitions.data_version import DataVersion +from dagster._core.definitions.external_asset import create_external_asset_from_source_asset + + +@observable_source_asset +def always_return_time() -> DataVersion: + return DataVersion(datetime.now().strftime("%H:%M:%S")) + + +defs = Definitions(assets=[create_external_asset_from_source_asset(always_return_time)]) diff --git a/examples/experimental/playground/playground_tests/test_observable_source_asset_adapter.py b/examples/experimental/playground/playground_tests/test_observable_source_asset_adapter.py new file mode 100644 index 0000000000000..71ff621dd5196 --- /dev/null +++ b/examples/experimental/playground/playground_tests/test_observable_source_asset_adapter.py @@ -0,0 +1,8 @@ +import playground.observable_source_asset_adapter +from dagster import DagsterInstance + + +def test_observable_source_adapter() -> None: + job_def = playground.observable_source_asset_adapter.defs.get_implicit_global_asset_job_def() + instance = DagsterInstance.ephemeral() + assert job_def.execute_in_process(instance=instance).success diff --git a/examples/experimental/playground/playground_tests/test_playground_include.py b/examples/experimental/playground/playground_tests/test_playground_include.py deleted file mode 100644 index 7248100b8b5fd..0000000000000 --- a/examples/experimental/playground/playground_tests/test_playground_include.py +++ /dev/null @@ -1,5 +0,0 @@ -from playground import INCLUDE_TEST - - -def test_include() -> None: - assert INCLUDE_TEST == "A_STRING"