diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 78234f16b3840..d616fb0361f8d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1406,6 +1406,7 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset: io_manager_def=None, observe_fn=None, op_tags=None, + automation_condition=None, auto_observe_interval_minutes=None, freshness_policy=None, _required_resource_keys=None, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py index f6ad06db0e0f3..29feae5a054b4 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py @@ -1,10 +1,13 @@ from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Set, Union, overload import dagster._check as check -from dagster._annotations import experimental +from dagster._annotations import deprecated_param, experimental from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) from dagster._core.definitions.decorators.asset_decorator import ( resolve_asset_key_and_name_for_decorator, ) @@ -44,11 +47,22 @@ def observable_source_asset( partitions_def: Optional[PartitionsDefinition] = None, auto_observe_interval_minutes: Optional[float] = None, freshness_policy: Optional[FreshnessPolicy] = None, + automation_condition: Optional[AutomationCondition] = None, op_tags: Optional[Mapping[str, Any]] = None, tags: Optional[Mapping[str, str]] = None, ) -> "_ObservableSourceAsset": ... +@deprecated_param( + param="auto_observe_interval_minutes", + breaking_version="1.10.0", + additional_warn_text="use `automation_condition` instead.", +) +@deprecated_param( + param="freshness_policy", + breaking_version="1.10.0", + additional_warn_text="use freshness checks instead.", +) @experimental def observable_source_asset( observe_fn: Optional[SourceAssetObserveFunction] = None, @@ -66,6 +80,7 @@ def observable_source_asset( partitions_def: Optional[PartitionsDefinition] = None, auto_observe_interval_minutes: Optional[float] = None, freshness_policy: Optional[FreshnessPolicy] = None, + automation_condition: Optional[AutomationCondition] = None, op_tags: Optional[Mapping[str, Any]] = None, tags: Optional[Mapping[str, str]] = None, ) -> Union[SourceAsset, "_ObservableSourceAsset"]: @@ -100,15 +115,15 @@ def observable_source_asset( compose the asset. auto_observe_interval_minutes (Optional[float]): While the asset daemon is turned on, a run of the observation function for this asset will be launched at this interval. - freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated - with respect to its root data. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not attached to runs of the asset. - observe_fn (Optional[SourceAssetObserveFunction]) Observation function for the source asset. + observe_fn (Optional[SourceAssetObserveFunction]): Observation function for the source asset. + automation_condition (Optional[AutomationCondition]): A condition describing when Dagster + should materialize this asset. """ if observe_fn is not None: return _ObservableSourceAsset()(observe_fn) @@ -127,6 +142,7 @@ def observable_source_asset( partitions_def, auto_observe_interval_minutes, freshness_policy, + automation_condition, op_tags, tags=normalize_tags(tags, strict=True), ) @@ -148,6 +164,7 @@ def __init__( partitions_def: Optional[PartitionsDefinition] = None, auto_observe_interval_minutes: Optional[float] = None, freshness_policy: Optional[FreshnessPolicy] = None, + automation_condition: Optional[AutomationCondition] = None, op_tags: Optional[Mapping[str, Any]] = None, tags: Optional[Mapping[str, str]] = None, ): @@ -168,6 +185,7 @@ def __init__( self.partitions_def = partitions_def self.auto_observe_interval_minutes = auto_observe_interval_minutes self.freshness_policy = freshness_policy + self.automation_condition = automation_condition self.op_tags = op_tags self.tags = tags @@ -204,6 +222,7 @@ def __call__(self, observe_fn: SourceAssetObserveFunction) -> SourceAsset: partitions_def=self.partitions_def, auto_observe_interval_minutes=self.auto_observe_interval_minutes, freshness_policy=self.freshness_policy, + automation_condition=self.automation_condition, tags=self.tags, ) diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 0a5523972d796..15ef90fdaae18 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -150,6 +150,7 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets description=source_asset.description, tags=source_asset.tags, freshness_policy=source_asset.freshness_policy, + automation_condition=source_asset.automation_condition, deps=[], owners=[], ) diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 5207cf9d36731..69ae591fea8e5 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -21,6 +21,9 @@ DataVersion, DataVersionsByPartition, ) +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey, Output from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.metadata import ( @@ -215,6 +218,7 @@ class SourceAsset(ResourceAddable, IHasInternalInit): _node_def: Optional[OpDefinition] # computed lazily auto_observe_interval_minutes: Optional[float] freshness_policy: Optional[FreshnessPolicy] + automation_condition: Optional[AutomationCondition] tags: Mapping[str, str] def __init__( @@ -232,6 +236,7 @@ def __init__( *, auto_observe_interval_minutes: Optional[float] = None, freshness_policy: Optional[FreshnessPolicy] = None, + automation_condition: Optional[AutomationCondition] = None, tags: Optional[Mapping[str, str]] = None, # This is currently private because it is necessary for source asset observation functions, # but we have not yet decided on a final API for associated one or more ops with a source @@ -285,6 +290,9 @@ def __init__( self.freshness_policy = check.opt_inst_param( freshness_policy, "freshness_policy", FreshnessPolicy ) + self.automation_condition = check.opt_inst_param( + automation_condition, "automation_condition", AutomationCondition + ) @staticmethod def dagster_internal_init( @@ -301,6 +309,7 @@ def dagster_internal_init( op_tags: Optional[Mapping[str, Any]], auto_observe_interval_minutes: Optional[float], freshness_policy: Optional[FreshnessPolicy], + automation_condition: Optional[AutomationCondition], tags: Optional[Mapping[str, str]], _required_resource_keys: Optional[AbstractSet[str]], ) -> "SourceAsset": @@ -317,6 +326,7 @@ def dagster_internal_init( op_tags=op_tags, auto_observe_interval_minutes=auto_observe_interval_minutes, freshness_policy=freshness_policy, + automation_condition=automation_condition, tags=tags, _required_resource_keys=_required_resource_keys, ) @@ -444,6 +454,7 @@ def with_resources(self, resource_defs) -> "SourceAsset": freshness_policy=self.freshness_policy, tags=self.tags, op_tags=self.op_tags, + automation_condition=self.automation_condition, _required_resource_keys=self._required_resource_keys, ) @@ -471,6 +482,7 @@ def with_attributes( tags=self.tags, freshness_policy=self.freshness_policy, op_tags=self.op_tags, + automation_condition=self.automation_condition, _required_resource_keys=self._required_resource_keys, ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py index 34d5c1c073116..02d9dddb5b8da 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py @@ -8,6 +8,7 @@ asset, asset_check, evaluate_automation_conditions, + observable_source_asset, ) from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -186,3 +187,42 @@ def foo_check() -> ...: ... defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time ) assert result.total_requested == 1 + + +def test_on_cron_on_observable_source() -> None: + @observable_source_asset(automation_condition=AutomationCondition.on_cron("@hourly")) + def obs() -> None: ... + + @asset(deps=[obs], automation_condition=AutomationCondition.on_cron("@hourly")) + def mat() -> None: ... + + current_time = datetime.datetime(2024, 8, 16, 4, 35) + defs = Definitions(assets=[obs, mat]) + instance = DagsterInstance.ephemeral() + + # hasn't passed a cron tick + result = evaluate_automation_conditions( + defs=defs, instance=instance, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # now passed a cron tick, kick off both + current_time += datetime.timedelta(minutes=30) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 2 + + # don't kick off again + current_time += datetime.timedelta(minutes=1) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # next hour, kick off again + current_time += datetime.timedelta(hours=1) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 2 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/hourly_observable.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/hourly_observable.py new file mode 100644 index 0000000000000..7c9f216e47654 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/hourly_observable.py @@ -0,0 +1,12 @@ +import dagster as dg + + +@dg.observable_source_asset(automation_condition=dg.AutomationCondition.on_cron("@hourly")) +def obs() -> None: ... + + +@dg.asset(deps=[obs], automation_condition=dg.AutomationCondition.on_cron("@hourly")) +def mat() -> None: ... + + +defs = dg.Definitions(assets=[obs, mat]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 7cce4f8f2d523..fadeb677c2379 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -703,3 +703,27 @@ def test_simple_old_code_server() -> None: _execute_ticks(context, executor) runs = _get_runs_for_latest_ticks(context) assert len(runs) == 1 + + +def test_observable_source_asset() -> None: + with get_grpc_workspace_request_context( + "hourly_observable" + ) as context, get_threadpool_executor() as executor: + time = datetime.datetime(2024, 8, 16, 1, 35) + with freeze_time(time): + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0 + + time += datetime.timedelta(hours=1) + with freeze_time(time): + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 1 + assert runs[0].asset_selection == {AssetKey("obs"), AssetKey("mat")} + + time += datetime.timedelta(minutes=1) + with freeze_time(time): + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0