Skip to content

Commit

Permalink
[1.8][external assets] deprecate SourceAsset (#23378)
Browse files Browse the repository at this point in the history
## Summary & Motivation

`AssetSpec` can now be used instead.

## How I Tested These Changes

(cherry picked from commit 5ed3b9d)
  • Loading branch information
sryza authored and jmsanders committed Aug 5, 2024
1 parent 4491a5a commit ffb0cee
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,23 @@ def __call__(self, observe_fn: SourceAssetObserveFunction) -> SourceAsset:
)
resolved_resource_keys = decorator_resource_keys.union(arg_resource_keys)

return SourceAsset(
key=source_asset_key,
metadata=self.metadata,
io_manager_key=self.io_manager_key,
io_manager_def=self.io_manager_def,
description=self.description,
group_name=self.group_name,
_required_resource_keys=resolved_resource_keys,
resource_defs=self.resource_defs,
observe_fn=observe_fn,
op_tags=self.op_tags,
partitions_def=self.partitions_def,
auto_observe_interval_minutes=self.auto_observe_interval_minutes,
freshness_policy=self.freshness_policy,
tags=self.tags,
)
with disable_dagster_warnings():
return SourceAsset(
key=source_asset_key,
metadata=self.metadata,
io_manager_key=self.io_manager_key,
io_manager_def=self.io_manager_def,
description=self.description,
group_name=self.group_name,
_required_resource_keys=resolved_resource_keys,
resource_defs=self.resource_defs,
observe_fn=observe_fn,
op_tags=self.op_tags,
partitions_def=self.partitions_def,
auto_observe_interval_minutes=self.auto_observe_interval_minutes,
freshness_policy=self.freshness_policy,
tags=self.tags,
)


@experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._annotations import PublicAttr, deprecated, experimental_param, public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -167,6 +167,7 @@ def fn(context: OpExecutionContext) -> Output[None]:
@experimental_param(param="io_manager_def")
@experimental_param(param="freshness_policy")
@experimental_param(param="tags")
@deprecated(breaking_version="2.0.0", additional_warn_text="Use AssetSpec instead.")
class SourceAsset(ResourceAddable):
"""A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def success_asset(foo):
assert _asset_keys_for_node(result, "hello__asset_foo") == {AssetKey(["hello", "asset_foo"])}


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_source_asset():
@asset
def asset1(source1):
Expand Down Expand Up @@ -308,6 +309,7 @@ def my_io_manager(_):
assert _asset_keys_for_node(result, "asset1") == {AssetKey("asset1")}


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_missing_io_manager():
@asset
def asset1(source1):
Expand Down Expand Up @@ -1560,6 +1562,7 @@ def test_multi_all():
assert materialization_events[2].asset_key == AssetKey("foo")


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_subset_with_source_asset():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down Expand Up @@ -1684,6 +1687,7 @@ def complicated_graph():
assert result.output_for_node("complicated_graph", "asset_3") == 4


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
@ignore_warning("Parameter `io_manager_def` .* is experimental")
def test_source_asset_io_manager_def():
class MyIOManager(IOManager):
Expand All @@ -1710,6 +1714,7 @@ def my_derived_asset(my_source_asset):
assert result.output_for_node("my_derived_asset") == 9


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_source_asset_io_manager_not_provided():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down Expand Up @@ -1738,6 +1743,7 @@ def my_derived_asset(my_source_asset):
assert result.output_for_node("my_derived_asset") == 9


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_source_asset_io_manager_key_provided():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down Expand Up @@ -1766,6 +1772,7 @@ def my_derived_asset(my_source_asset):
assert result.output_for_node("my_derived_asset") == 9


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
@ignore_warning("Parameter `resource_defs` .* is experimental")
@ignore_warning("Parameter `io_manager_def` .* is experimental")
def test_source_asset_requires_resource_defs():
Expand Down Expand Up @@ -1855,6 +1862,7 @@ def the_asset():
assert the_job.execute_in_process().success


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
@ignore_warning("Parameter `io_manager_def` .* is experimental")
def test_transitive_io_manager_dep_not_provided():
@io_manager(required_resource_keys={"foo"})
Expand Down Expand Up @@ -2168,6 +2176,7 @@ async def aio_gen_asset(context):
assert result.success


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_selection_multi_component():
source_asset = SourceAsset(["apple", "banana"])

Expand Down Expand Up @@ -2773,6 +2782,7 @@ def y(b, c):
assert result.output_for_node("foo_3", "f") == 9


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_subset_cycle_resolution_basic():
"""Ops:
foo produces: a, b
Expand Down Expand Up @@ -2840,6 +2850,7 @@ def foo_prime(context, a, b):
}


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_subset_cycle_resolution_with_asset_check():
"""Ops:
foo produces: a, b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def second():


@ignore_warning("Parameter `io_manager_def` .* is experimental")
@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_materialize_source_assets():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down Expand Up @@ -192,6 +193,7 @@ def the_asset(): ...

@ignore_warning("Parameter `resource_defs` .* is experimental")
@ignore_warning("Parameter `io_manager_def` .* is experimental")
@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_materialize_source_asset_conflicts():
@io_manager(required_resource_keys={"foo"})
def the_manager():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dagster._core.test_utils import (
assert_namedtuple_lists_equal,
freeze_time,
ignore_warning,
raise_exception_on_warnings,
)
from dagster._time import create_datetime, parse_time_string
Expand Down Expand Up @@ -376,6 +377,7 @@ def load_input(self, context):
).success


@ignore_warning("Class `SourceAsset` is deprecated and will be removed in 2.0.0.")
def test_source_asset_partitions():
hourly_asset = SourceAsset(
AssetKey("hourly_asset"),
Expand Down

0 comments on commit ffb0cee

Please sign in to comment.