Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Oct 6, 2023
1 parent d4118ee commit a93f78b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Sequence
from typing import List, Optional, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_spec import (
Expand All @@ -7,7 +7,14 @@
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.decorators.sensor_decorator import sensor
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.run_request import SensorResult
from dagster._core.definitions.sensor_definition import (
SensorDefinition,
)
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
Expand Down Expand Up @@ -171,3 +178,48 @@ def _shim_assets_def(context: AssetExecutionContext):
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright

return _shim_assets_def


def create_observation_with_version(
asset_key: AssetKey, data_version: DataVersion
) -> AssetObservation:
return AssetObservation(
asset_key=asset_key,
tags={DATA_VERSION_TAG: data_version.value},
)


def get_auto_sensor_name(asset_key: AssetKey) -> str:
return f"__auto_observe_sensor{asset_key.to_python_identifier()}"


def sensor_def_from_observable_source_asset(
observable_source_asset: SourceAsset,
sensor_name: Optional[str] = None,
) -> SensorDefinition:
"""Given an existing observable source asset, generate a sensor that observes it on a regular
interval as specified by `SourceAsset.auto_observe_internal_minutes`.
"""
sensor_name = sensor_name if sensor_name else get_auto_sensor_name(observable_source_asset.key)

@sensor(
name=sensor_name,
minimum_interval_seconds=(
int(observable_source_asset.auto_observe_interval_minutes * 60)
if observable_source_asset.auto_observe_interval_minutes is not None
else 5
* 60 # I could not find the default value (undocumented) so guessing it is 5 minutes?
),
)
def _sensor(context, **kwargs) -> SensorResult:
assert observable_source_asset.observe_fn
return SensorResult(
asset_events=[
create_observation_with_version(
observable_source_asset.key,
observable_source_asset.observe_fn(**kwargs),
),
]
)

return _sensor
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AbstractSet, Any, Callable, Iterable, Optional
from typing import AbstractSet, Iterable

import pytest
from dagster import (
Expand Down Expand Up @@ -29,15 +29,13 @@
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_assets_from_specs,
get_auto_sensor_name,
sensor_def_from_observable_source_asset,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.sensor_definition import (
SensorDefinition,
build_sensor_context,
)
from dagster._core.definitions.source_asset import (
observe_fn_to_op_compute_fn,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
Expand Down Expand Up @@ -318,65 +316,6 @@ def observing_only_asset_sensor() -> SensorResult:
assert isinstance(asset_event, AssetObservation)


def create_observation_with_version(
asset_key: AssetKey, data_version: DataVersion
) -> AssetObservation:
return AssetObservation(
asset_key=asset_key,
tags={DATA_VERSION_TAG: data_version.value},
)


def assets_def_from_observe_fn(asset_key: AssetKey, observe_fn: Callable[..., Any]):
@asset(
key=asset_key,
metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value},
)
def _asset(context: AssetExecutionContext, **kwargs) -> None:
assets_def = context.job_def.asset_layer.assets_def_for_asset(context.asset_key)
op_compute_fn = observe_fn_to_op_compute_fn(
observe_fn=observe_fn,
partitions_def=assets_def.partitions_def,
asset_key=context.asset_key,
)
return op_compute_fn.decorated_fn(context, **kwargs)

return _asset


def _get_auto_sensor_name(asset_key: AssetKey) -> str:
return f"__auto_observe_sensor{asset_key.to_python_identifier()}"


def sensor_def_from_observable_source_asset(
observable_source_asset: SourceAsset,
sensor_name: Optional[str] = None,
) -> SensorDefinition:
sensor_name = sensor_name if sensor_name else _get_auto_sensor_name(observable_source_asset.key)

@sensor(
name=sensor_name,
minimum_interval_seconds=(
int(observable_source_asset.auto_observe_interval_minutes * 60)
if observable_source_asset.auto_observe_interval_minutes is not None
else 5
* 60 # I could not find the default value (undocumented) so guessing it is 5 minutes?
),
)
def _sensor(context, **kwargs) -> SensorResult:
assert observable_source_asset.observe_fn
return SensorResult(
asset_events=[
create_observation_with_version(
observable_source_asset.key,
observable_source_asset.observe_fn(**kwargs),
),
]
)

return _sensor


def test_framework_support_for_observable_source_assets_on_assets_def() -> None:
observing_only_asset_key = AssetKey("observing_only_asset")

Expand Down Expand Up @@ -433,8 +372,7 @@ def an_asset() -> DataVersion:
assert result.success

assert get_latest_asset_observation(instance, an_asset.key).data_version == "data_version"

sensor_def = defs.get_sensor_def(_get_auto_sensor_name(an_asset.key))
sensor_def = defs.get_sensor_def(get_auto_sensor_name(an_asset.key))

sensor_result = sensor_def.evaluate_tick(build_sensor_context(instance=instance))

Expand Down

0 comments on commit a93f78b

Please sign in to comment.