Skip to content

Commit

Permalink
Wrap observable source assets in AssetsDefinition
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 19, 2023
1 parent 9b37183 commit 0333952
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import OpExecutionContext


def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
Expand Down Expand Up @@ -53,25 +57,24 @@ def an_asset() -> None:
return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.partitions_def is None,
"Observable source assets not supported yet: partitions_def should be None",
)
def create_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Schedulable observable source assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -82,8 +85,17 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

kwargs["partitions_def"] = source_asset.partitions_def

if source_asset.observe_fn:
kwargs["resource_defs"] = source_asset.resource_defs

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def shim_asset(context: OpExecutionContext) -> None:
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return op_function.decorated_fn(context)

return shim_asset
16 changes: 8 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
cast,
)

from typing_extensions import TYPE_CHECKING, TypeAlias
from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
Expand Down Expand Up @@ -50,19 +50,19 @@
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings

if TYPE_CHECKING:
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
)
# if TYPE_CHECKING:
# from dagster._core.definitions.decorators.op_decorator import (
# DecoratedOpFunction,
# )

# Going with this catch-all for the time-being to permit pythonic resources
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]


@staticmethod
def wrap_source_asset_observe_fn_in_op_compute_fn(
source_asset: "SourceAsset",
) -> "DecoratedOpFunction":
def wrap_source_asset_observe_fn_in_op_compute_fn(source_asset: "SourceAsset"):
# TODO getting errant TCH001 error from ruff
# -> "DecoratedOpFunction":
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
is_context_provided,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import (
create_assets_def_from_source_asset,
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)


Expand Down Expand Up @@ -107,7 +109,7 @@ def an_asset(source_asset: str) -> str:
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand All @@ -120,3 +122,25 @@ def an_asset(source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_assets_def_from_source_asset(an_observable_source_asset)])

result = defs.get_implicit_global_asset_job_def().execute_in_process()

assert result.success

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1

0 comments on commit 0333952

Please sign in to comment.