Skip to content

Commit

Permalink
Revert "tmp"
Browse files Browse the repository at this point in the history
This reverts commit e9c592034f7b10e0a4b4f4a1212ab442847435a6.
  • Loading branch information
OwenKephart committed Oct 21, 2024
1 parent 7785aab commit 72af2ad
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Set, Union, overload

import dagster._check as check
from dagster._annotations import experimental
from dagster._annotations import deprecated_param, experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.decorators.asset_decorator import (
resolve_asset_key_and_name_for_decorator,
)
Expand Down Expand Up @@ -44,11 +47,22 @@ def observable_source_asset(
partitions_def: Optional[PartitionsDefinition] = None,
auto_observe_interval_minutes: Optional[float] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
op_tags: Optional[Mapping[str, Any]] = None,
tags: Optional[Mapping[str, str]] = None,
) -> "_ObservableSourceAsset": ...


@deprecated_param(
param="auto_observe_interval_minutes",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead.",
)
@deprecated_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead.",
)
@experimental
def observable_source_asset(
observe_fn: Optional[SourceAssetObserveFunction] = None,
Expand All @@ -66,6 +80,7 @@ def observable_source_asset(
partitions_def: Optional[PartitionsDefinition] = None,
auto_observe_interval_minutes: Optional[float] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
op_tags: Optional[Mapping[str, Any]] = None,
tags: Optional[Mapping[str, str]] = None,
) -> Union[SourceAsset, "_ObservableSourceAsset"]:
Expand Down Expand Up @@ -100,15 +115,15 @@ def observable_source_asset(
compose the asset.
auto_observe_interval_minutes (Optional[float]): While the asset daemon is turned on, a run
of the observation function for this asset will be launched at this interval.
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated
with respect to its root data.
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
Frameworks may expect and require certain metadata to be attached to a op. Values that
are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`.
tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not
attached to runs of the asset.
observe_fn (Optional[SourceAssetObserveFunction]) Observation function for the source asset.
observe_fn (Optional[SourceAssetObserveFunction]): Observation function for the source asset.
automation_condition (Optional[AutomationCondition]): A condition describing when Dagster
should materialize this asset.
"""
if observe_fn is not None:
return _ObservableSourceAsset()(observe_fn)
Expand All @@ -127,6 +142,7 @@ def observable_source_asset(
partitions_def,
auto_observe_interval_minutes,
freshness_policy,
automation_condition,
op_tags,
tags=normalize_tags(tags, strict=True),
)
Expand All @@ -148,6 +164,7 @@ def __init__(
partitions_def: Optional[PartitionsDefinition] = None,
auto_observe_interval_minutes: Optional[float] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
op_tags: Optional[Mapping[str, Any]] = None,
tags: Optional[Mapping[str, str]] = None,
):
Expand All @@ -168,6 +185,7 @@ def __init__(
self.partitions_def = partitions_def
self.auto_observe_interval_minutes = auto_observe_interval_minutes
self.freshness_policy = freshness_policy
self.automation_condition = automation_condition
self.op_tags = op_tags
self.tags = tags

Expand Down Expand Up @@ -204,6 +222,7 @@ def __call__(self, observe_fn: SourceAssetObserveFunction) -> SourceAsset:
partitions_def=self.partitions_def,
auto_observe_interval_minutes=self.auto_observe_interval_minutes,
freshness_policy=self.freshness_policy,
automation_condition=self.automation_condition,
tags=self.tags,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
description=source_asset.description,
tags=source_asset.tags,
freshness_policy=source_asset.freshness_policy,
automation_condition=source_asset.automation_condition,
deps=[],
owners=[],
)
Expand Down
12 changes: 12 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
DataVersion,
DataVersionsByPartition,
)
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey, Output
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import (
Expand Down Expand Up @@ -215,6 +218,7 @@ class SourceAsset(ResourceAddable, IHasInternalInit):
_node_def: Optional[OpDefinition] # computed lazily
auto_observe_interval_minutes: Optional[float]
freshness_policy: Optional[FreshnessPolicy]
automation_condition: Optional[AutomationCondition]
tags: Mapping[str, str]

def __init__(
Expand All @@ -232,6 +236,7 @@ def __init__(
*,
auto_observe_interval_minutes: Optional[float] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
tags: Optional[Mapping[str, str]] = None,
# This is currently private because it is necessary for source asset observation functions,
# but we have not yet decided on a final API for associated one or more ops with a source
Expand Down Expand Up @@ -285,6 +290,9 @@ def __init__(
self.freshness_policy = check.opt_inst_param(
freshness_policy, "freshness_policy", FreshnessPolicy
)
self.automation_condition = check.opt_inst_param(
automation_condition, "automation_condition", AutomationCondition
)

@staticmethod
def dagster_internal_init(
Expand All @@ -301,6 +309,7 @@ def dagster_internal_init(
op_tags: Optional[Mapping[str, Any]],
auto_observe_interval_minutes: Optional[float],
freshness_policy: Optional[FreshnessPolicy],
automation_condition: Optional[AutomationCondition],
tags: Optional[Mapping[str, str]],
_required_resource_keys: Optional[AbstractSet[str]],
) -> "SourceAsset":
Expand All @@ -317,6 +326,7 @@ def dagster_internal_init(
op_tags=op_tags,
auto_observe_interval_minutes=auto_observe_interval_minutes,
freshness_policy=freshness_policy,
automation_condition=automation_condition,
tags=tags,
_required_resource_keys=_required_resource_keys,
)
Expand Down Expand Up @@ -444,6 +454,7 @@ def with_resources(self, resource_defs) -> "SourceAsset":
freshness_policy=self.freshness_policy,
tags=self.tags,
op_tags=self.op_tags,
automation_condition=self.automation_condition,
_required_resource_keys=self._required_resource_keys,
)

Expand Down Expand Up @@ -471,6 +482,7 @@ def with_attributes(
tags=self.tags,
freshness_policy=self.freshness_policy,
op_tags=self.op_tags,
automation_condition=self.automation_condition,
_required_resource_keys=self._required_resource_keys,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
asset,
asset_check,
evaluate_automation_conditions,
observable_source_asset,
)

from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import (
Expand Down Expand Up @@ -186,3 +187,42 @@ def foo_check() -> ...: ...
defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time
)
assert result.total_requested == 1


def test_on_cron_on_observable_source() -> None:
@observable_source_asset(automation_condition=AutomationCondition.on_cron("@hourly"))
def obs() -> None: ...

@asset(deps=[obs], automation_condition=AutomationCondition.on_cron("@hourly"))
def mat() -> None: ...

current_time = datetime.datetime(2024, 8, 16, 4, 35)
defs = Definitions(assets=[obs, mat])
instance = DagsterInstance.ephemeral()

# hasn't passed a cron tick
result = evaluate_automation_conditions(
defs=defs, instance=instance, evaluation_time=current_time
)
assert result.total_requested == 0

# now passed a cron tick, kick off both
current_time += datetime.timedelta(minutes=30)
result = evaluate_automation_conditions(
defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time
)
assert result.total_requested == 2

# don't kick off again
current_time += datetime.timedelta(minutes=1)
result = evaluate_automation_conditions(
defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time
)
assert result.total_requested == 0

# next hour, kick off again
current_time += datetime.timedelta(hours=1)
result = evaluate_automation_conditions(
defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time
)
assert result.total_requested == 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dagster as dg


@dg.observable_source_asset(automation_condition=dg.AutomationCondition.on_cron("@hourly"))
def obs() -> None: ...


@dg.asset(deps=[obs], automation_condition=dg.AutomationCondition.on_cron("@hourly"))
def mat() -> None: ...


defs = dg.Definitions(assets=[obs, mat])
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,27 @@ def test_simple_old_code_server() -> None:
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 1


def test_observable_source_asset() -> None:
with get_grpc_workspace_request_context(
"hourly_observable"
) as context, get_threadpool_executor() as executor:
time = datetime.datetime(2024, 8, 16, 1, 35)
with freeze_time(time):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0

time += datetime.timedelta(hours=1)
with freeze_time(time):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 1
assert runs[0].asset_selection == {AssetKey("obs"), AssetKey("mat")}

time += datetime.timedelta(minutes=1)
with freeze_time(time):
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0

0 comments on commit 72af2ad

Please sign in to comment.