Skip to content

Commit

Permalink
observable assets in a user space sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 20, 2023
1 parent 78ff1b8 commit 895b2ca
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.events import (
AssetMaterialization,
AssetObservation,
UserEvent,
)
from dagster._core.definitions.instigation_logger import InstigationLogger
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.partition import (
CachingDynamicPartitionsLoader,
)
Expand All @@ -55,7 +61,6 @@
)
from .asset_selection import AssetSelection
from .graph_definition import GraphDefinition
from .job_definition import JobDefinition
from .run_request import (
AddDynamicPartitionsRequest,
DagsterRunReaction,
Expand All @@ -72,6 +77,7 @@
from dagster import ResourceDefinition
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition import RepositoryDefinition
from dagster._core.events import DagsterEvent


@whitelist_for_serdes
Expand Down Expand Up @@ -178,6 +184,7 @@ def __init__(
)
self._logger: Optional[InstigationLogger] = None
self._cursor_updated = False
self._events: List["DagsterEvent"] = []

def __enter__(self) -> "SensorEvaluationContext":
self._cm_scope_entered = True
Expand Down Expand Up @@ -373,6 +380,55 @@ def has_captured_logs(self):
def log_key(self) -> Optional[List[str]]:
return self._log_key

@public
def log_event(self, event: UserEvent) -> None:
from dagster._core.events import (
AssetObservationData,
DagsterEvent,
DagsterEventType,
StepMaterializationData,
)

"""Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op.
Events logged with this method will appear in the list of DagsterEvents, as well as the event log.
Args:
event (Union[AssetMaterialization, AssetObservation, ExpectationResult]): The event to log.
**Examples:**
.. code-block:: python
from dagster import op, AssetMaterialization
@op
def log_materialization(context):
context.log_event(AssetMaterialization("foo"))
"""
dagster_event = None
if isinstance(event, AssetMaterialization):
event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value
data_payload = StepMaterializationData(event)
dagster_event = DagsterEvent(
event_type_value=event_type_value,
event_specific_data=data_payload,
job_name="", # RUNLESS_JOB_NAME
)

elif isinstance(event, AssetObservation):
event_type_value = DagsterEventType.ASSET_OBSERVATION.value
data_payload = AssetObservationData(event)
dagster_event = DagsterEvent(
event_type_value=event_type_value,
event_specific_data=data_payload,
job_name="", # RUNLESS_JOB_NAME
)
else:
raise DagsterInvariantViolationError(f"Unsupported event type: {type(event)}")

self._events.append(dagster_event)


RawSensorEvaluationFunctionReturn = Union[
Iterator[Union[SkipReason, RunRequest, DagsterRunReaction, SensorResult]],
Expand Down Expand Up @@ -799,6 +855,7 @@ def evaluate_tick(self, context: "SensorEvaluationContext") -> "SensorExecutionD
dagster_run_reactions,
captured_log_key=context.log_key if context.has_captured_logs() else None,
dynamic_partitions_requests=dynamic_partitions_requests,
dagster_events=context._events, # noqa: SLF001
)

def has_loadable_targets(self) -> bool:
Expand Down Expand Up @@ -939,6 +996,10 @@ class SensorExecutionData(
Sequence[Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]]
],
),
(
"dagster_events",
List["DagsterEvent"],
),
],
)
):
Expand All @@ -954,6 +1015,7 @@ def __new__(
dynamic_partitions_requests: Optional[
Sequence[Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]]
] = None,
dagster_events: Optional[Iterable["DagsterEvent"]] = None,
):
check.opt_sequence_param(run_requests, "run_requests", RunRequest)
check.opt_str_param(skip_message, "skip_message")
Expand All @@ -968,6 +1030,7 @@ def __new__(
check.invariant(
not (run_requests and skip_message), "Found both skip data and run request data"
)
# TODO DagsterEvent type check
return super(SensorExecutionData, cls).__new__(
cls,
run_requests=run_requests,
Expand All @@ -976,6 +1039,7 @@ def __new__(
dagster_run_reactions=dagster_run_reactions,
captured_log_key=captured_log_key,
dynamic_partitions_requests=dynamic_partitions_requests,
dagster_events=list(dagster_events) if dagster_events else [],
)


Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,9 @@ def _evaluate_sensor(

assert isinstance(sensor_runtime_data, SensorExecutionData)

for dagster_event in sensor_runtime_data.dagster_events or []:
instance.report_dagster_event(dagster_event, run_id="")

if sensor_runtime_data.dynamic_partitions_requests:
for request in sensor_runtime_data.dynamic_partitions_requests:
existent_partitions = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,28 @@
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions import materialize
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.data_version import DATA_VERSION_TAG
from dagster._core.definitions.decorators.sensor_decorator import sensor
from dagster._core.definitions.events import AssetObservation
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import (
create_assets_def_from_source_asset,
create_unexecutable_observable_assets_def,
)
from dagster._core.definitions.run_request import SkipReason
from dagster._core.definitions.sensor_definition import (
SensorEvaluationContext,
build_sensor_context,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -215,3 +230,67 @@ def an_observable_source_asset() -> DataVersion:

all_materializations = result.get_asset_materialization_events()
assert len(all_materializations) == 0


def get_latest_asset_observation(
instance: DagsterInstance, asset_key: AssetKey
) -> AssetObservation:
event_records = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_OBSERVATION,
asset_key=asset_key,
),
limit=1,
)

assert len(event_records) == 1

event_record = event_records[0]

return check.not_none(event_record.asset_observation)


def test_demonstrate_explicit_sensor_in_user_space() -> None:
def compute_data_version() -> str:
return "data_version"

observing_only_asset_key = AssetKey("observing_only_asset")

@asset(
key=observing_only_asset_key,
metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value},
)
def observing_only_asset(context: AssetExecutionContext) -> None:
context.log_event(
AssetObservation(
asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()}
)
)

asset_execution_instance = DagsterInstance.ephemeral()

assert materialize(assets=[observing_only_asset], instance=asset_execution_instance).success

assert (
get_latest_asset_observation(
asset_execution_instance, observing_only_asset_key
).data_version
== "data_version"
)

@sensor(job_name="observing_only_sensor")
def observing_only_asset_sensor(context: SensorEvaluationContext) -> SkipReason:
context.log_event(
AssetObservation(
asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()}
)
)
return SkipReason("Never kicks off run")

sensor_instance = DagsterInstance.ephemeral()

sensor_execution_data = observing_only_asset_sensor.evaluate_tick(
build_sensor_context(instance=sensor_instance)
)

assert len(sensor_execution_data.dagster_events) == 1

0 comments on commit 895b2ca

Please sign in to comment.