From eee4bd686bd70832d3478dd97899dc3583be67fd Mon Sep 17 00:00:00 2001 From: benpankow Date: Wed, 21 Aug 2024 14:06:23 -0700 Subject: [PATCH] incorporate storage kind --- .../dagster_graphql/schema/asset_graph.py | 3 +++ .../dagster_graphql_tests/graphql/repo.py | 14 +++++++++++ .../graphql/test_assets.py | 22 +++++++++++++++++ .../dagster/_core/definitions/asset_spec.py | 6 +++++ .../asset_defs_tests/test_assets.py | 24 +++++++++++++++++-- 5 files changed, 67 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index f807921b9b542..d980b29c2142a 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -1215,6 +1215,9 @@ def resolve_tags(self, _graphene_info: ResolveInfo) -> Sequence[GrapheneDefiniti def resolve_kinds(self, _graphene_info: ResolveInfo) -> Sequence[str]: if self._external_asset_node.compute_kind: + storage_kind = (self._external_asset_node.tags or {}).get("dagster/storage_kind") + if storage_kind: + return [self._external_asset_node.compute_kind, storage_kind] return [self._external_asset_node.compute_kind] return [ diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index af957c19805ab..ddce377791756 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -1772,6 +1772,19 @@ def multi_asset_with_kinds(): return 1 +@multi_asset( + specs=[ + AssetSpec(key="third_kinds_key", tags={"dagster/storage_kind": "snowflake"}), + AssetSpec( + key="fourth_kinds_key", + ), + ], + compute_kind="python", +) +def asset_with_compute_storage_kinds(): + return 1 + + fresh_diamond_assets_job = define_asset_job( "fresh_diamond_assets_job", AssetSelection.assets(fresh_diamond_bottom).upstream() ) @@ -2086,6 +2099,7 @@ def define_assets(): grouped_asset_4, ungrouped_asset_5, multi_asset_with_kinds, + asset_with_compute_storage_kinds, ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index abcff410b98e3..44bed6acd1136 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -2628,6 +2628,28 @@ def test_kinds(self, graphql_context: WorkspaceRequestContext): second_kinds_key = result.data["assetNodeOrError"] assert set(second_kinds_key["kinds"]) == {"python"} + result = execute_dagster_graphql( + graphql_context, + GET_KINDS, + variables={ + "assetKey": {"path": ["third_kinds_key"]}, + }, + ) + + third_kinds_key = result.data["assetNodeOrError"] + assert set(third_kinds_key["kinds"]) == {"python", "snowflake"} + + result = execute_dagster_graphql( + graphql_context, + GET_KINDS, + variables={ + "assetKey": {"path": ["fourth_kinds_key"]}, + }, + ) + + fourth_kinds_key = result.data["assetNodeOrError"] + assert set(fourth_kinds_key["kinds"]) == {"python"} + def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext): result = execute_dagster_graphql(graphql_context, HAS_ASSET_CHECKS) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 6e79888134638..dc628260e5c66 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -157,6 +157,12 @@ def __new__( if kinds is not None and len(kinds) > 2: raise DagsterInvalidDefinitionError("Assets can have at most two kinds currently.") + storage_kind = (tags or {}).get("dagster/storage_kind") + if kinds is not None and storage_kind is not None and storage_kind not in kinds: + raise DagsterInvalidDefinitionError( + f"If specifying dagster/storage_kind={storage_kind} and kinds, {storage_kind} must be in the list of kinds" + ) + return super().__new__( cls, key=key, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 917622f940764..8d3f14b66797b 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -2301,7 +2301,7 @@ def assets(): ... ): @multi_asset(specs=[AssetSpec("asset1", kinds={"python", "snowflake", "bigquery"})]) - def assets(): ... + def assets2(): ... with pytest.raises( DagsterInvalidDefinitionError, @@ -2309,7 +2309,27 @@ def assets(): ... ): @multi_asset(compute_kind="my_compute_kind", specs=[AssetSpec("asset1", kinds={"python"})]) - def assets(): ... + def assets3(): ... + + @multi_asset( + specs=[AssetSpec("asset1", kinds={"snowflake"}, tags={"dagster/storage_kind": "snowflake"})] + ) + def assets4(): ... + + @multi_asset(specs=[AssetSpec("asset1", tags={"dagster/storage_kind": "snowflake"})]) + def assets5(): ... + + with pytest.raises( + DagsterInvalidDefinitionError, + match="If specifying dagster/storage_kind=bigquery and kinds, bigquery must be in the list of kinds", + ): + + @multi_asset( + specs=[ + AssetSpec("asset1", kinds={"snowflake"}, tags={"dagster/storage_kind": "bigquery"}) + ] + ) + def assets4(): ... def test_asset_out_with_tags():