Skip to content

Commit

Permalink
[1.9] Hide references to AutoMaterializePolicy / FreshenssPolicy from…
Browse files Browse the repository at this point in the history
… docs / type signatures (#25509)

## Summary & Motivation

As title

## How I Tested These Changes

## Changelog

The `auto_materialize_policy` and `freshness_policy` arguments have been hidden from the type signature of `AssetSpec`, `AssetOut`, `@asset`, and `@graph_asset`. Existing code using these arguments will continue to function, but they will not show up in typeahead or docs.

The `auto_observe_interval_minutes` and `freshness_policy` arguments of `@observable_source_asset` have similarly been hidden.
  • Loading branch information
OwenKephart authored Oct 24, 2024
1 parent 6ee426a commit bb2fe45
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 87 deletions.
12 changes: 3 additions & 9 deletions docs/sphinx/sections/api/apidocs/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,14 @@ Refer to the `Asset observation <https://docs.dagster.io/concepts/assets/asset-o

.. autoclass:: AssetObservation

Auto-materialize and freshness policies
Declarative Automation
---------------------------------------

Refer to the `Auto-materialize policies <https://docs.dagster.io/concepts/assets/asset-auto-execution>`_ documentation for more information.

.. autoclass:: AutoMaterializePolicy

.. autoclass:: AutoMaterializeRule

.. autoclass:: AutomationConditionSensorDefinition
Refer to the `Declarative Automation <https://docs.dagster.io/concepts/automation/declarative-automation>`_ documentation for more information.

.. autoclass:: AutomationCondition

.. autoclass:: FreshnessPolicy
.. autoclass:: AutomationConditionSensorDefinition

Asset values
------------
Expand Down
28 changes: 22 additions & 6 deletions python_modules/dagster/dagster/_core/definitions/asset_out.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Type, Union

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._annotations import (
PublicAttr,
experimental_param,
hidden_param,
only_allow_hidden_params_in_kwargs,
)
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
Expand All @@ -25,6 +30,16 @@

@experimental_param(param="owners")
@experimental_param(param="tags")
@hidden_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead",
)
@hidden_param(
param="auto_materialize_policy",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead",
)
class AssetOut(
NamedTuple(
"_AssetOut",
Expand Down Expand Up @@ -92,14 +107,13 @@ def __new__(
metadata: Optional[Mapping[str, Any]] = None,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
owners: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
**kwargs,
):
only_allow_hidden_params_in_kwargs(AssetOut, kwargs)
if isinstance(key_prefix, str):
key_prefix = [key_prefix]

Expand All @@ -121,10 +135,12 @@ def __new__(
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy, "freshness_policy", FreshnessPolicy
kwargs.get("freshness_policy"), "freshness_policy", FreshnessPolicy
),
automation_condition=check.opt_inst_param(
resolve_automation_condition(automation_condition, auto_materialize_policy),
resolve_automation_condition(
automation_condition, kwargs.get("auto_materialize_policy")
),
"automation_condition",
AutomationCondition,
),
Expand Down
39 changes: 23 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
)

import dagster._check as check
from dagster._annotations import PublicAttr, deprecated_param, experimental_param, public
from dagster._annotations import (
PublicAttr,
experimental_param,
hidden_param,
only_allow_hidden_params_in_kwargs,
public,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
Expand Down Expand Up @@ -88,11 +94,16 @@ def validate_kind_tags(kinds: Optional[AbstractSet[str]]) -> None:
@experimental_param(param="owners")
@experimental_param(param="tags")
@experimental_param(param="kinds")
@deprecated_param(
@hidden_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead.",
)
@hidden_param(
param="auto_materialize_policy",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead",
)
class AssetSpec(
NamedTuple(
"_AssetSpec",
Expand Down Expand Up @@ -133,10 +144,6 @@ class AssetSpec(
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code for this specific asset,
overriding the code version of the materialization function
freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up
to date this asset is intended to be.
auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to
the specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
Expand All @@ -159,17 +166,17 @@ def __new__(
skippable: bool = False,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
owners: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
kinds: Optional[Set[str]] = None,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
partitions_def: Optional[PartitionsDefinition] = None,
**kwargs,
):
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates

only_allow_hidden_params_in_kwargs(AssetSpec, kwargs)

key = AssetKey.from_coercible(key)
asset_deps = coerce_to_deps_and_check_duplicates(deps, key)

Expand Down Expand Up @@ -199,12 +206,14 @@ def __new__(
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy,
kwargs.get("freshness_policy"),
"freshness_policy",
FreshnessPolicy,
),
automation_condition=check.opt_inst_param(
resolve_automation_condition(automation_condition, auto_materialize_policy),
resolve_automation_condition(
automation_condition, kwargs.get("auto_materialize_policy")
),
"automation_condition",
AutomationCondition,
),
Expand All @@ -225,15 +234,14 @@ def dagster_internal_init(
skippable: bool,
group_name: Optional[str],
code_version: Optional[str],
freshness_policy: Optional[FreshnessPolicy],
automation_condition: Optional[AutomationCondition],
owners: Optional[Sequence[str]],
tags: Optional[Mapping[str, str]],
kinds: Optional[Set[str]],
auto_materialize_policy: Optional[AutoMaterializePolicy],
partitions_def: Optional[PartitionsDefinition],
**kwargs,
) -> "AssetSpec":
check.invariant(auto_materialize_policy is None)
check.invariant(kwargs.get("auto_materialize_policy") is None)
return AssetSpec(
key=key,
deps=deps,
Expand All @@ -242,7 +250,7 @@ def dagster_internal_init(
skippable=skippable,
group_name=group_name,
code_version=code_version,
freshness_policy=freshness_policy,
freshness_policy=kwargs.get("freshness_policy"),
automation_condition=automation_condition,
owners=owners,
tags=tags,
Expand All @@ -260,7 +268,6 @@ def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:

@property
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
# TODO: FOU-243
return (
self.automation_condition.as_auto_materialize_policy()
if self.automation_condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import dagster._check as check
from dagster._annotations import (
deprecated_param,
experimental_param,
hidden_param,
only_allow_hidden_params_in_kwargs,
Expand Down Expand Up @@ -96,9 +95,6 @@ def asset(
op_tags: Optional[Mapping[str, Any]] = ...,
group_name: Optional[str] = ...,
output_required: bool = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = ...,
automation_condition: Optional[AutomationCondition] = ...,
backfill_policy: Optional[BackfillPolicy] = ...,
retry_policy: Optional[RetryPolicy] = ...,
Expand Down Expand Up @@ -143,12 +139,12 @@ def _validate_hidden_non_argument_dep_param(
breaking_version="2.0.0",
additional_warn_text="use `deps` instead.",
)
@deprecated_param(
@hidden_param(
param="auto_materialize_policy",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead.",
)
@deprecated_param(
@hidden_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead.",
Expand All @@ -174,7 +170,6 @@ def asset(
op_tags: Optional[Mapping[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
Expand All @@ -183,8 +178,6 @@ def asset(
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
kinds: Optional[AbstractSet[str]] = None,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
**kwargs,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -247,8 +240,6 @@ def asset(
output_required (bool): Whether the decorated function will always materialize an asset.
Defaults to True. If False, the function can return None, which will not be materialized to
storage and will halt execution of downstream assets.
freshness_policy (FreshnessPolicy): (Deprecated) A constraint telling Dagster how often this
asset is intended to be updated with respect to its root data.
automation_condition (AutomationCondition): (Experimental) A condition describing when
Dagster should materialize this asset.
backfill_policy (BackfillPolicy): (Experimental) Configure Dagster to backfill this asset according to its
Expand Down Expand Up @@ -315,9 +306,9 @@ def my_asset(my_upstream_asset: int) -> int:
op_tags=op_tags,
group_name=group_name,
output_required=output_required,
freshness_policy=freshness_policy,
freshness_policy=kwargs.get("freshness_policy"),
automation_condition=resolve_automation_condition(
automation_condition, auto_materialize_policy
automation_condition, kwargs.get("auto_materialize_policy")
),
backfill_policy=backfill_policy,
retry_policy=retry_policy,
Expand Down Expand Up @@ -744,6 +735,16 @@ def graph_asset(
@experimental_param(param="tags")
@experimental_param(param="owners")
@experimental_param(param="kinds")
@hidden_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead",
)
@hidden_param(
param="auto_materialize_policy",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead",
)
def graph_asset(
compose_fn: Optional[Callable] = None,
*,
Expand All @@ -757,16 +758,14 @@ def graph_asset(
metadata: Optional[RawMetadataMapping] = None,
tags: Optional[Mapping[str, str]] = None,
owners: Optional[Sequence[str]] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
code_version: Optional[str] = None,
key: Optional[CoercibleToAssetKey] = None,
kinds: Optional[AbstractSet[str]] = None,
**kwargs,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Creates a software-defined asset that's computed using a graph of ops.
Expand Down Expand Up @@ -812,8 +811,6 @@ def graph_asset(
e.g. `team:finops`.
kinds (Optional[Set[str]]): A list of strings representing the kinds of the asset. These
will be made visible in the Dagster UI.
freshness_policy (Optional[FreshnessPolicy]): A constraint telling Dagster how often this asset is
intended to be updated with respect to its root data.
automation_condition (Optional[AutomationCondition]): The AutomationCondition to use
for this asset.
backfill_policy (Optional[BackfillPolicy]): The BackfillPolicy to use for this asset.
Expand All @@ -837,6 +834,8 @@ def store_files(files) -> None:
def slack_files_table():
return store_files(fetch_files_from_slack())
"""
only_allow_hidden_params_in_kwargs(graph_asset, kwargs)

if compose_fn is None:
return lambda fn: graph_asset(
fn, # type: ignore
Expand All @@ -850,9 +849,9 @@ def slack_files_table():
metadata=metadata,
tags=tags,
owners=owners,
freshness_policy=freshness_policy,
freshness_policy=kwargs.get("freshness_policy"),
automation_condition=resolve_automation_condition(
automation_condition, auto_materialize_policy
automation_condition, kwargs.get("auto_materialize_policy")
),
backfill_policy=backfill_policy,
resource_defs=resource_defs,
Expand All @@ -874,9 +873,9 @@ def slack_files_table():
metadata=metadata,
tags=tags,
owners=owners,
freshness_policy=freshness_policy,
freshness_policy=kwargs.get("freshness_policy"),
automation_condition=resolve_automation_condition(
automation_condition, auto_materialize_policy
automation_condition, kwargs.get("auto_materialize_policy")
),
backfill_policy=backfill_policy,
resource_defs=resource_defs,
Expand Down
Loading

0 comments on commit bb2fe45

Please sign in to comment.