Skip to content

Commit

Permalink
observable assets in a user space sensor
Browse files Browse the repository at this point in the history
feedabck

Extract out observe_fn_to_op_compute_fn to it can be reused in other contexts

cp

feedback

experimental

cp

cp

cp
  • Loading branch information
schrockn committed Oct 6, 2023
1 parent d8a6069 commit d4118ee
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 14 deletions.
33 changes: 21 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@

def wrap_source_asset_observe_fn_in_op_compute_fn(
source_asset: "SourceAsset",
) -> "DecoratedOpFunction":
check.not_none(source_asset.observe_fn) # for runtime check
assert source_asset.observe_fn is not None # for type checker
return observe_fn_to_op_compute_fn(
observe_fn=source_asset.observe_fn,
partitions_def=source_asset.partitions_def,
asset_key=source_asset.key,
)


def observe_fn_to_op_compute_fn(
observe_fn: SourceAssetObserveFunction,
partitions_def: Optional[PartitionsDefinition],
asset_key: AssetKey,
) -> "DecoratedOpFunction":
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
Expand All @@ -71,11 +85,6 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(
OpExecutionContext,
)

check.not_none(source_asset.observe_fn, "Must be an observable source asset")
assert source_asset.observe_fn # for type checker

observe_fn = source_asset.observe_fn

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext) -> None:
Expand All @@ -88,22 +97,22 @@ def fn(context: OpExecutionContext) -> None:
)

if isinstance(observe_fn_return_value, DataVersion):
if source_asset.partitions_def is not None:
if partitions_def is not None:
raise DagsterInvalidObservationError(
f"{source_asset.key} is partitioned, so its observe function should return a"
f"{asset_key} is partitioned, so its observe function should return a"
" DataVersionsByPartition, not a DataVersion"
)

context.log_event(
AssetObservation(
asset_key=source_asset.key,
asset_key=asset_key,
tags={DATA_VERSION_TAG: observe_fn_return_value.value},
)
)
elif isinstance(observe_fn_return_value, DataVersionsByPartition):
if source_asset.partitions_def is None:
if partitions_def is None:
raise DagsterInvalidObservationError(
f"{source_asset.key} is not partitioned, so its observe function should return"
f"{asset_key} is not partitioned, so its observe function should return"
" a DataVersion, not a DataVersionsByPartition"
)

Expand All @@ -113,14 +122,14 @@ def fn(context: OpExecutionContext) -> None:
) in observe_fn_return_value.data_versions_by_partition.items():
context.log_event(
AssetObservation(
asset_key=source_asset.key,
asset_key=asset_key,
tags={DATA_VERSION_TAG: data_version.value},
partition=partition_key,
)
)
else:
raise DagsterInvalidObservationError(
f"Observe function for {source_asset.key} must return a DataVersion or"
f"Observe function for {asset_key} must return a DataVersion or"
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AbstractSet, Iterable
from typing import AbstractSet, Any, Callable, Iterable, Optional

import pytest
from dagster import (
Expand All @@ -11,18 +11,36 @@
Definitions,
IOManager,
JobDefinition,
SensorResult,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions import materialize
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.data_version import DATA_VERSION_TAG
from dagster._core.definitions.decorators.sensor_decorator import sensor
from dagster._core.definitions.events import AssetObservation
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_assets_from_specs,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.sensor_definition import (
SensorDefinition,
build_sensor_context,
)
from dagster._core.definitions.source_asset import (
observe_fn_to_op_compute_fn,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType


def test_external_asset_basic_creation() -> None:
Expand Down Expand Up @@ -228,3 +246,199 @@ def an_observable_source_asset() -> DataVersion:

all_materializations = result.get_asset_materialization_events()
assert len(all_materializations) == 0


def get_latest_asset_observation(
instance: DagsterInstance, asset_key: AssetKey
) -> AssetObservation:
event_records = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_OBSERVATION,
asset_key=asset_key,
),
limit=1,
)

assert len(event_records) == 1

event_record = event_records[0]

return check.not_none(event_record.asset_observation)


def test_demonstrate_explicit_sensor_in_user_space() -> None:
def compute_data_version() -> str:
return "data_version"

observing_only_asset_key = AssetKey("observing_only_asset")

@asset(
key=observing_only_asset_key,
metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value},
)
def observing_only_asset(context: AssetExecutionContext) -> None:
context.log_event(
AssetObservation(
asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()}
)
)

asset_execution_instance = DagsterInstance.ephemeral()

assert materialize(assets=[observing_only_asset], instance=asset_execution_instance).success

assert (
get_latest_asset_observation(
asset_execution_instance, observing_only_asset_key
).data_version
== "data_version"
)

@sensor(job_name="observing_only_sensor")
def observing_only_asset_sensor() -> SensorResult:
return SensorResult(
asset_events=[
AssetObservation(
asset_key=observing_only_asset_key,
tags={DATA_VERSION_TAG: compute_data_version()},
)
]
)

sensor_instance = DagsterInstance.ephemeral()

sensor_execution_data = observing_only_asset_sensor.evaluate_tick(
build_sensor_context(instance=sensor_instance)
)

assert len(sensor_execution_data.asset_events) == 1

asset_event = sensor_execution_data.asset_events[0]

assert isinstance(asset_event, AssetObservation)


def create_observation_with_version(
asset_key: AssetKey, data_version: DataVersion
) -> AssetObservation:
return AssetObservation(
asset_key=asset_key,
tags={DATA_VERSION_TAG: data_version.value},
)


def assets_def_from_observe_fn(asset_key: AssetKey, observe_fn: Callable[..., Any]):
@asset(
key=asset_key,
metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value},
)
def _asset(context: AssetExecutionContext, **kwargs) -> None:
assets_def = context.job_def.asset_layer.assets_def_for_asset(context.asset_key)
op_compute_fn = observe_fn_to_op_compute_fn(
observe_fn=observe_fn,
partitions_def=assets_def.partitions_def,
asset_key=context.asset_key,
)
return op_compute_fn.decorated_fn(context, **kwargs)

return _asset


def _get_auto_sensor_name(asset_key: AssetKey) -> str:
return f"__auto_observe_sensor{asset_key.to_python_identifier()}"


def sensor_def_from_observable_source_asset(
observable_source_asset: SourceAsset,
sensor_name: Optional[str] = None,
) -> SensorDefinition:
sensor_name = sensor_name if sensor_name else _get_auto_sensor_name(observable_source_asset.key)

@sensor(
name=sensor_name,
minimum_interval_seconds=(
int(observable_source_asset.auto_observe_interval_minutes * 60)
if observable_source_asset.auto_observe_interval_minutes is not None
else 5
* 60 # I could not find the default value (undocumented) so guessing it is 5 minutes?
),
)
def _sensor(context, **kwargs) -> SensorResult:
assert observable_source_asset.observe_fn
return SensorResult(
asset_events=[
create_observation_with_version(
observable_source_asset.key,
observable_source_asset.observe_fn(**kwargs),
),
]
)

return _sensor


def test_framework_support_for_observable_source_assets_on_assets_def() -> None:
observing_only_asset_key = AssetKey("observing_only_asset")

@observable_source_asset(name=observing_only_asset_key.to_python_identifier())
def observing_only_source_asset() -> DataVersion:
return DataVersion("data_version")

observing_only_assets_def = create_external_asset_from_source_asset(observing_only_source_asset)

asset_execution_instance = DagsterInstance.ephemeral()

assert materialize(
assets=[observing_only_assets_def], instance=asset_execution_instance
).success

assert (
get_latest_asset_observation(
instance=asset_execution_instance, asset_key=observing_only_asset_key
).data_version
== "data_version"
)

observing_only_asset_sensor = sensor_def_from_observable_source_asset(
observable_source_asset=observing_only_source_asset,
)

sensor_instance = DagsterInstance.ephemeral()

sensor_execution_data = observing_only_asset_sensor.evaluate_tick(
build_sensor_context(instance=sensor_instance)
)

assert len(sensor_execution_data.asset_events) == 1

asset_event = sensor_execution_data.asset_events[0]

assert isinstance(asset_event, AssetObservation)


def test_observable_source_adapter_ergonomics() -> None:
@observable_source_asset
def an_asset() -> DataVersion:
return DataVersion("data_version")

# calling these helpers could be in the Definitions object itself
defs = Definitions(
assets=[create_external_asset_from_source_asset(an_asset)],
sensors=[sensor_def_from_observable_source_asset(an_asset)],
)

instance = DagsterInstance.ephemeral()

result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)
assert result.success

assert get_latest_asset_observation(instance, an_asset.key).data_version == "data_version"

sensor_def = defs.get_sensor_def(_get_auto_sensor_name(an_asset.key))

sensor_result = sensor_def.evaluate_tick(build_sensor_context(instance=instance))

assert len(sensor_result.asset_events) == 1
asset_observation = sensor_result.asset_events[0]
assert isinstance(asset_observation, AssetObservation)
assert asset_observation.asset_key == an_asset.key

0 comments on commit d4118ee

Please sign in to comment.