Skip to content

Commit

Permalink
Wrap observable source assets in AssetsDefinition
Browse files Browse the repository at this point in the history
cp

explicit use ephem instnace
  • Loading branch information
schrockn committed Oct 6, 2023
1 parent 6c0eef0 commit 224c58f
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 14 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
39 changes: 27 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext

Expand Down Expand Up @@ -123,20 +126,23 @@ def _external_assets_def(context: AssetExecutionContext) -> None:


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -149,10 +155,19 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _external_assets_def() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
)

check.invariant(isinstance(_external_assets_def, AssetsDefinition))
assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright
check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright

return _external_assets_def
return _shim_assets_def
107 changes: 107 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/observable_asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# from typing import Sequence

# from dagster import _check as check
# from dagster._core.definitions.asset_spec import (
# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
# AssetExecutionType,
# AssetSpec,
# )
# from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
# from dagster._core.definitions.source_asset import (
# SourceAsset,
# wrap_source_asset_observe_fn_in_op_compute_fn,
# )
# from dagster._core.errors import DagsterInvariantViolationError
# from dagster._core.execution.context.compute import OpExecutionContext


# def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
# new_specs = []
# for spec in specs:
# check.invariant(
# spec.auto_materialize_policy is None,
# "auto_materialize_policy must be None since it is ignored",
# )
# check.invariant(spec.code_version is None, "code_version must be None since it is ignored")
# check.invariant(
# spec.freshness_policy is None, "freshness_policy must be None since it is ignored"
# )
# check.invariant(
# spec.skippable is False,
# "skippable must be False since it is ignored and False is the default",
# )

# new_specs.append(
# AssetSpec(
# key=spec.key,
# description=spec.description,
# group_name=spec.group_name,
# metadata={
# **(spec.metadata or {}),
# **{
# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: (
# AssetExecutionType.UNEXECUTABLE.value
# )
# },
# },
# deps=spec.deps,
# )
# )

# @multi_asset(specs=new_specs)
# def an_asset() -> None:
# raise DagsterInvariantViolationError(
# f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
# )

# return an_asset


# def create_assets_def_from_source_asset(source_asset: SourceAsset):
# check.invariant(
# source_asset.auto_observe_interval_minutes is None,
# "Schedulable observable source assets not supported yet: auto_observe_interval_minutes"
# " should be None",
# )

# injected_metadata = (
# {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
# if source_asset.observe_fn is None
# else {}
# )

# kwargs = {
# "key": source_asset.key,
# "metadata": {
# **source_asset.metadata,
# **injected_metadata,
# },
# "group_name": source_asset.group_name,
# "description": source_asset.description,
# "partitions_def": source_asset.partitions_def,
# }

# if source_asset.io_manager_def:
# kwargs["io_manager_def"] = source_asset.io_manager_def
# elif source_asset.io_manager_key:
# kwargs["io_manager_key"] = source_asset.io_manager_key

# kwargs["partitions_def"] = source_asset.partitions_def

# if source_asset.observe_fn:
# kwargs["resource_defs"] = source_asset.resource_defs

# @asset(**kwargs)
# def shim_asset(context: OpExecutionContext):
# if not source_asset.observe_fn:
# raise NotImplementedError(f"Asset {source_asset.key} is not executable")

# op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
# return_value = op_function.decorated_fn(context)
# check.invariant(
# return_value is None,
# "The wrapped decorated_fn should return a value. If this changes, this code path must"
# " changed to process the events appopriately.",
# )

# return shim_asset
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
def fn(context: OpExecutionContext) -> None:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.external_asset import (
Expand Down Expand Up @@ -202,3 +204,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)])

instance = DagsterInstance.ephemeral()
result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)

assert result.success
assert result.output_for_node("an_observable_source_asset") is None

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1

0 comments on commit 224c58f

Please sign in to comment.