diff --git a/docs/sphinx/sections/api/apidocs/assets.rst b/docs/sphinx/sections/api/apidocs/assets.rst index 3240a71a80554..d118cd3da02b7 100644 --- a/docs/sphinx/sections/api/apidocs/assets.rst +++ b/docs/sphinx/sections/api/apidocs/assets.rst @@ -94,20 +94,14 @@ Refer to the `Asset observation `_ documentation for more information. - -.. autoclass:: AutoMaterializePolicy - -.. autoclass:: AutoMaterializeRule - -.. autoclass:: AutomationConditionSensorDefinition +Refer to the `Declarative Automation `_ documentation for more information. .. autoclass:: AutomationCondition -.. autoclass:: FreshnessPolicy +.. autoclass:: AutomationConditionSensorDefinition Asset values ------------ diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py index b30d05618fa31..be3835eaeb2b7 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_out.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py @@ -1,7 +1,12 @@ from typing import Any, Mapping, NamedTuple, Optional, Sequence, Type, Union import dagster._check as check -from dagster._annotations import PublicAttr, experimental_param +from dagster._annotations import ( + PublicAttr, + experimental_param, + hidden_param, + only_allow_hidden_params_in_kwargs, +) from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy @@ -25,6 +30,16 @@ @experimental_param(param="owners") @experimental_param(param="tags") +@hidden_param( + param="freshness_policy", + breaking_version="1.10.0", + additional_warn_text="use freshness checks instead", +) +@hidden_param( + param="auto_materialize_policy", + breaking_version="1.10.0", + additional_warn_text="use `automation_condition` instead", +) class AssetOut( NamedTuple( "_AssetOut", @@ -92,14 +107,13 @@ def __new__( metadata: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, code_version: Optional[str] = None, - freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, owners: Optional[Sequence[str]] = None, tags: Optional[Mapping[str, str]] = None, - # TODO: FOU-243 - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + **kwargs, ): + only_allow_hidden_params_in_kwargs(AssetOut, kwargs) if isinstance(key_prefix, str): key_prefix = [key_prefix] @@ -121,10 +135,12 @@ def __new__( group_name=check.opt_str_param(group_name, "group_name"), code_version=check.opt_str_param(code_version, "code_version"), freshness_policy=check.opt_inst_param( - freshness_policy, "freshness_policy", FreshnessPolicy + kwargs.get("freshness_policy"), "freshness_policy", FreshnessPolicy ), automation_condition=check.opt_inst_param( - resolve_automation_condition(automation_condition, auto_materialize_policy), + resolve_automation_condition( + automation_condition, kwargs.get("auto_materialize_policy") + ), "automation_condition", AutomationCondition, ), diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 562b9dbc02dd5..b7df6afc42f89 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -13,7 +13,13 @@ ) import dagster._check as check -from dagster._annotations import PublicAttr, deprecated_param, experimental_param, public +from dagster._annotations import ( + PublicAttr, + experimental_param, + hidden_param, + only_allow_hidden_params_in_kwargs, + public, +) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, @@ -88,11 +94,16 @@ def validate_kind_tags(kinds: Optional[AbstractSet[str]]) -> None: @experimental_param(param="owners") @experimental_param(param="tags") @experimental_param(param="kinds") -@deprecated_param( +@hidden_param( param="freshness_policy", breaking_version="1.10.0", additional_warn_text="use freshness checks instead.", ) +@hidden_param( + param="auto_materialize_policy", + breaking_version="1.10.0", + additional_warn_text="use `automation_condition` instead", +) class AssetSpec( NamedTuple( "_AssetSpec", @@ -133,10 +144,6 @@ class AssetSpec( not provided, the name "default" is used. code_version (Optional[str]): The version of the code for this specific asset, overriding the code version of the materialization function - freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up - to date this asset is intended to be. - auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to - the specified asset. backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset. owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each string can be a user's email address, or a team name prefixed with `team:`, @@ -159,17 +166,17 @@ def __new__( skippable: bool = False, group_name: Optional[str] = None, code_version: Optional[str] = None, - freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, owners: Optional[Sequence[str]] = None, tags: Optional[Mapping[str, str]] = None, kinds: Optional[Set[str]] = None, - # TODO: FOU-243 - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, partitions_def: Optional[PartitionsDefinition] = None, + **kwargs, ): from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates + only_allow_hidden_params_in_kwargs(AssetSpec, kwargs) + key = AssetKey.from_coercible(key) asset_deps = coerce_to_deps_and_check_duplicates(deps, key) @@ -199,12 +206,14 @@ def __new__( group_name=check.opt_str_param(group_name, "group_name"), code_version=check.opt_str_param(code_version, "code_version"), freshness_policy=check.opt_inst_param( - freshness_policy, + kwargs.get("freshness_policy"), "freshness_policy", FreshnessPolicy, ), automation_condition=check.opt_inst_param( - resolve_automation_condition(automation_condition, auto_materialize_policy), + resolve_automation_condition( + automation_condition, kwargs.get("auto_materialize_policy") + ), "automation_condition", AutomationCondition, ), @@ -225,15 +234,14 @@ def dagster_internal_init( skippable: bool, group_name: Optional[str], code_version: Optional[str], - freshness_policy: Optional[FreshnessPolicy], automation_condition: Optional[AutomationCondition], owners: Optional[Sequence[str]], tags: Optional[Mapping[str, str]], kinds: Optional[Set[str]], - auto_materialize_policy: Optional[AutoMaterializePolicy], partitions_def: Optional[PartitionsDefinition], + **kwargs, ) -> "AssetSpec": - check.invariant(auto_materialize_policy is None) + check.invariant(kwargs.get("auto_materialize_policy") is None) return AssetSpec( key=key, deps=deps, @@ -242,7 +250,7 @@ def dagster_internal_init( skippable=skippable, group_name=group_name, code_version=code_version, - freshness_policy=freshness_policy, + freshness_policy=kwargs.get("freshness_policy"), automation_condition=automation_condition, owners=owners, tags=tags, @@ -260,7 +268,6 @@ def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: @property def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: - # TODO: FOU-243 return ( self.automation_condition.as_auto_materialize_policy() if self.automation_condition diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 80f27bca1ebb2..535e74d76cd7a 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -16,7 +16,6 @@ import dagster._check as check from dagster._annotations import ( - deprecated_param, experimental_param, hidden_param, only_allow_hidden_params_in_kwargs, @@ -96,9 +95,6 @@ def asset( op_tags: Optional[Mapping[str, Any]] = ..., group_name: Optional[str] = ..., output_required: bool = ..., - freshness_policy: Optional[FreshnessPolicy] = ..., - # TODO: FOU-243 - auto_materialize_policy: Optional[AutoMaterializePolicy] = ..., automation_condition: Optional[AutomationCondition] = ..., backfill_policy: Optional[BackfillPolicy] = ..., retry_policy: Optional[RetryPolicy] = ..., @@ -143,12 +139,12 @@ def _validate_hidden_non_argument_dep_param( breaking_version="2.0.0", additional_warn_text="use `deps` instead.", ) -@deprecated_param( +@hidden_param( param="auto_materialize_policy", breaking_version="1.10.0", additional_warn_text="use `automation_condition` instead.", ) -@deprecated_param( +@hidden_param( param="freshness_policy", breaking_version="1.10.0", additional_warn_text="use freshness checks instead.", @@ -174,7 +170,6 @@ def asset( op_tags: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, output_required: bool = True, - freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, retry_policy: Optional[RetryPolicy] = None, @@ -183,8 +178,6 @@ def asset( check_specs: Optional[Sequence[AssetCheckSpec]] = None, owners: Optional[Sequence[str]] = None, kinds: Optional[AbstractSet[str]] = None, - # TODO: FOU-243 - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, **kwargs, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Create a definition for how to compute an asset. @@ -247,8 +240,6 @@ def asset( output_required (bool): Whether the decorated function will always materialize an asset. Defaults to True. If False, the function can return None, which will not be materialized to storage and will halt execution of downstream assets. - freshness_policy (FreshnessPolicy): (Deprecated) A constraint telling Dagster how often this - asset is intended to be updated with respect to its root data. automation_condition (AutomationCondition): (Experimental) A condition describing when Dagster should materialize this asset. backfill_policy (BackfillPolicy): (Experimental) Configure Dagster to backfill this asset according to its @@ -315,9 +306,9 @@ def my_asset(my_upstream_asset: int) -> int: op_tags=op_tags, group_name=group_name, output_required=output_required, - freshness_policy=freshness_policy, + freshness_policy=kwargs.get("freshness_policy"), automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy + automation_condition, kwargs.get("auto_materialize_policy") ), backfill_policy=backfill_policy, retry_policy=retry_policy, @@ -744,6 +735,16 @@ def graph_asset( @experimental_param(param="tags") @experimental_param(param="owners") @experimental_param(param="kinds") +@hidden_param( + param="freshness_policy", + breaking_version="1.10.0", + additional_warn_text="use freshness checks instead", +) +@hidden_param( + param="auto_materialize_policy", + breaking_version="1.10.0", + additional_warn_text="use `automation_condition` instead", +) def graph_asset( compose_fn: Optional[Callable] = None, *, @@ -757,9 +758,6 @@ def graph_asset( metadata: Optional[RawMetadataMapping] = None, tags: Optional[Mapping[str, str]] = None, owners: Optional[Sequence[str]] = None, - freshness_policy: Optional[FreshnessPolicy] = None, - # TODO: FOU-243 - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, @@ -767,6 +765,7 @@ def graph_asset( code_version: Optional[str] = None, key: Optional[CoercibleToAssetKey] = None, kinds: Optional[AbstractSet[str]] = None, + **kwargs, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. @@ -812,8 +811,6 @@ def graph_asset( e.g. `team:finops`. kinds (Optional[Set[str]]): A list of strings representing the kinds of the asset. These will be made visible in the Dagster UI. - freshness_policy (Optional[FreshnessPolicy]): A constraint telling Dagster how often this asset is - intended to be updated with respect to its root data. automation_condition (Optional[AutomationCondition]): The AutomationCondition to use for this asset. backfill_policy (Optional[BackfillPolicy]): The BackfillPolicy to use for this asset. @@ -837,6 +834,8 @@ def store_files(files) -> None: def slack_files_table(): return store_files(fetch_files_from_slack()) """ + only_allow_hidden_params_in_kwargs(graph_asset, kwargs) + if compose_fn is None: return lambda fn: graph_asset( fn, # type: ignore @@ -850,9 +849,9 @@ def slack_files_table(): metadata=metadata, tags=tags, owners=owners, - freshness_policy=freshness_policy, + freshness_policy=kwargs.get("freshness_policy"), automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy + automation_condition, kwargs.get("auto_materialize_policy") ), backfill_policy=backfill_policy, resource_defs=resource_defs, @@ -874,9 +873,9 @@ def slack_files_table(): metadata=metadata, tags=tags, owners=owners, - freshness_policy=freshness_policy, + freshness_policy=kwargs.get("freshness_policy"), automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy + automation_condition, kwargs.get("auto_materialize_policy") ), backfill_policy=backfill_policy, resource_defs=resource_defs, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py index 29feae5a054b4..51709ace01c78 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py @@ -1,7 +1,7 @@ from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Set, Union, overload import dagster._check as check -from dagster._annotations import deprecated_param, experimental +from dagster._annotations import experimental, hidden_param from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec from dagster._core.definitions.assets import AssetsDefinition @@ -53,12 +53,12 @@ def observable_source_asset( ) -> "_ObservableSourceAsset": ... -@deprecated_param( +@hidden_param( param="auto_observe_interval_minutes", breaking_version="1.10.0", additional_warn_text="use `automation_condition` instead.", ) -@deprecated_param( +@hidden_param( param="freshness_policy", breaking_version="1.10.0", additional_warn_text="use freshness checks instead.", @@ -78,11 +78,10 @@ def observable_source_asset( required_resource_keys: Optional[AbstractSet[str]] = None, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, partitions_def: Optional[PartitionsDefinition] = None, - auto_observe_interval_minutes: Optional[float] = None, - freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, op_tags: Optional[Mapping[str, Any]] = None, tags: Optional[Mapping[str, str]] = None, + **kwargs, ) -> Union[SourceAsset, "_ObservableSourceAsset"]: """Create a `SourceAsset` with an associated observation function. @@ -113,8 +112,6 @@ def observable_source_asset( the `io_manager_def` argument. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. - auto_observe_interval_minutes (Optional[float]): While the asset daemon is turned on, a run - of the observation function for this asset will be launched at this interval. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that @@ -140,8 +137,8 @@ def observable_source_asset( required_resource_keys, resource_defs, partitions_def, - auto_observe_interval_minutes, - freshness_policy, + kwargs.get("auto_observe_interval_minutes"), + kwargs.get("freshness_policy"), automation_condition, op_tags, tags=normalize_tags(tags, strict=True), diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 6917740ecfa8d..ad8a335e34eff 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -916,6 +916,8 @@ def my_graph(x, y): @ignore_warning("Parameter `resource_defs` .* is experimental") @ignore_warning("Parameter `tags` .* is experimental") @ignore_warning("Parameter `owners` .* is experimental") +@ignore_warning("Parameter `auto_materialize_policy` .* is deprecated") +@ignore_warning("Parameter `freshness_policy` .* is deprecated") @pytest.mark.parametrize( "automation_condition_arg", [ @@ -1126,6 +1128,8 @@ def bar(): @ignore_warning("Function `AutoMaterializePolicy.lazy` is deprecated") @ignore_warning("Parameter `auto_materialize_policy` is deprecated") @ignore_warning("Parameter `resource_defs`") +@ignore_warning("Parameter `auto_materialize_policy`") +@ignore_warning("Parameter `freshness_policy`") @pytest.mark.parametrize( "automation_condition_arg", [ @@ -1352,7 +1356,7 @@ def my_asset(context): @ignore_warning("Static method `AutomationCondition.on_cron` is experimental") @ignore_warning("Static method `AutomationCondition.eager` is experimental") @ignore_warning("Function `AutoMaterializePolicy.lazy` is deprecated") -@ignore_warning("Parameter `auto_materialize_policy` is deprecated") +@ignore_warning("Parameter `auto_materialize_policy`") def test_multi_asset_with_automation_conditions(): ac2 = AutomationCondition.on_cron("@daily") ac3 = AutomationCondition.eager() diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py index cd6e751c97990..d20cca282a43d 100644 --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py @@ -26,7 +26,6 @@ resource, ) from dagster._cli.job import job_execute_command -from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.reconstruct import get_ephemeral_repository_name from dagster._core.definitions.resource_definition import dagster_maintained_resource from dagster._core.execution.context.input import InputContext @@ -300,28 +299,6 @@ def asset2(): ... assert stats["num_assets_with_freshness_policies_in_repo"] == "1" -# TODO: FOU-243 -@pytest.mark.skip("obsolete EAGER vs. LAZY distinction") -def test_get_status_from_remote_repo_auto_materialize_policy(instance): - @asset(auto_materialize_policy=AutoMaterializePolicy.lazy()) - def asset1(): ... - - @asset - def asset2(): ... - - @asset(auto_materialize_policy=AutoMaterializePolicy.eager()) - def asset3(): ... - - remote_repo = RemoteRepository( - RepositorySnap.from_def(Definitions(assets=[asset1, asset2, asset3]).get_repository_def()), - repository_handle=RepositoryHandle.for_test(), - instance=instance, - ) - stats = get_stats_from_remote_repo(remote_repo) - assert stats["num_assets_with_eager_auto_materialize_policies_in_repo"] == "1" - assert stats["num_assets_with_lazy_auto_materialize_policies_in_repo"] == "1" - - def test_get_stats_from_remote_repo_code_versions(instance): @asset(code_version="hello") def asset1(): ... diff --git a/python_modules/dagster/dagster_tests/general_tests/test_internal_init_implementations.py b/python_modules/dagster/dagster_tests/general_tests/test_internal_init_implementations.py index 71cfb957fe9c5..76c1a923c04dd 100644 --- a/python_modules/dagster/dagster_tests/general_tests/test_internal_init_implementations.py +++ b/python_modules/dagster/dagster_tests/general_tests/test_internal_init_implementations.py @@ -39,7 +39,10 @@ def test_dagster_internal_init_class_follow_rules(cls: Type): " dagster_internal_init methods cannot have default values" ) - assert all(p.kind == Parameter.KEYWORD_ONLY for p in dagster_internal_init_params.values()), ( + assert all( + p.kind == Parameter.KEYWORD_ONLY or (p.name == "kwargs" and p.kind == Parameter.VAR_KEYWORD) + for p in dagster_internal_init_params.values() + ), ( f"{cls.__name__}.dagster_internal_init has one or more positional arguments," " dagster_internal_init methods can only have keyword-only arguments" )