Skip to content

Commit

Permalink
Wrap observable source assets in AssetsDefinition
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 20, 2023
1 parent c1afec3 commit d01b7bc
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import OpExecutionContext


def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
Expand Down Expand Up @@ -53,21 +57,24 @@ def an_asset() -> None:
return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
def create_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Schedulable observable source assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -79,8 +86,17 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

kwargs["partitions_def"] = source_asset.partitions_def

if source_asset.observe_fn:
kwargs["resource_defs"] = source_asset.resource_defs

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def shim_asset(context: OpExecutionContext) -> None:
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return op_function.decorated_fn(context)

return shim_asset
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import (
create_assets_def_from_source_asset,
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition

Expand Down Expand Up @@ -112,7 +114,7 @@ def an_asset(source_asset: str) -> str:
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand Down Expand Up @@ -175,9 +177,9 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset)
shimmed_source_asset = create_assets_def_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand All @@ -193,3 +195,25 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_assets_def_from_source_asset(an_observable_source_asset)])

result = defs.get_implicit_global_asset_job_def().execute_in_process()

assert result.success

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1

0 comments on commit d01b7bc

Please sign in to comment.