diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index d5ae2c6702df3..608df002d4442 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -156,8 +156,8 @@ def __new__( validate_asset_owner(owner, key) kind_tags = {tag_key for tag_key in (tags or {}).keys() if tag_key.startswith(KIND_PREFIX)} - if kind_tags is not None and len(kind_tags) > 2: - raise DagsterInvalidDefinitionError("Assets can have at most two kinds currently.") + if kind_tags is not None and len(kind_tags) > 3: + raise DagsterInvalidDefinitionError("Assets can have at most three kinds currently.") return super().__new__( cls, diff --git a/python_modules/dagster/dagster/_core/definitions/tags/__init__.py b/python_modules/dagster/dagster/_core/definitions/tags/__init__.py index ff3aaa6440a63..e44c6259b335d 100644 --- a/python_modules/dagster/dagster/_core/definitions/tags/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/tags/__init__.py @@ -1,8 +1,23 @@ -from typing import Optional +from typing import Any, Dict, Mapping, Optional +from dagster._annotations import deprecated from dagster._core.definitions.tags.tag_set import NamespacedTagSet as NamespacedTagSet +from dagster._core.storage.tags import KIND_PREFIX +def has_kind(tags: Mapping[str, Any], kind: str) -> bool: + return build_kind_tag_key(kind) in tags + + +def build_kind_tag_key(kind: str) -> str: + return f"{KIND_PREFIX}{kind}" + + +def build_kind_tag(kind: str) -> Dict[str, Any]: + return {build_kind_tag_key(kind): ""} + + +@deprecated(breaking_version="1.9") class StorageKindTagSet(NamespacedTagSet): """Tag entries which describe how an asset is stored. diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index ffe3b65fc6da1..5f08b3e1a8325 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -2297,7 +2297,7 @@ def assets(): ... assert assets.specs_by_key[AssetKey("asset1")].kinds == {"python"} with pytest.raises( - DagsterInvalidDefinitionError, match="Assets can have at most two kinds currently." + DagsterInvalidDefinitionError, match="Assets can have at most three kinds currently." ): @multi_asset( @@ -2308,6 +2308,7 @@ def assets(): ... "dagster/kind/python": "", "dagster/kind/snowflake": "", "dagster/kind/bigquery": "", + "dagster/kind/airflow": "", }, ) ] diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index 7f409d92052e3..ccf577a7eb6cf 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -347,7 +347,6 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource internal_asset_deps=internal_asset_deps, deps=deps, required_resource_keys=required_resource_keys, - compute_kind="dbt", partitions_def=partitions_def, can_subset=True, op_tags=resolved_op_tags, diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 4e2f93ac55920..cf03ccbbb5136 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -50,7 +50,7 @@ CodeReferencesMetadataValue, LocalFileCodeReference, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster._utils.merger import merge_dicts from dagster_dbt.utils import ( @@ -816,7 +816,8 @@ def build_dbt_multi_asset_args( } ), tags={ - **(StorageKindTagSet(storage_kind=dbt_adapter_type) if dbt_adapter_type else {}), + **build_kind_tag("dbt"), + **(build_kind_tag(dbt_adapter_type) if dbt_adapter_type else {}), **dagster_dbt_translator.get_tags(dbt_resource_props), }, group_name=dagster_dbt_translator.get_group_name(dbt_resource_props), diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 14f5f77f30158..680b9b825ffb4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -28,10 +28,9 @@ asset, materialize, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag, has_kind from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY from dagster._core.execution.context.compute import AssetExecutionContext -from dagster._core.storage.tags import COMPUTE_KIND_TAG from dagster._core.types.dagster_type import DagsterType from dagster_dbt.asset_decorator import dbt_assets from dagster_dbt.asset_specs import build_dbt_asset_specs @@ -360,7 +359,6 @@ def my_dbt_assets(): ... assert my_dbt_assets.op.tags == { **op_tags, - COMPUTE_KIND_TAG: "dbt", "dagster_dbt/select": "fqn:*", } @@ -369,7 +367,6 @@ def my_dbt_assets_with_select(): ... assert my_dbt_assets_with_select.op.tags == { **op_tags, - COMPUTE_KIND_TAG: "dbt", "dagster_dbt/select": "raw_customers+", } @@ -378,7 +375,6 @@ def my_dbt_assets_with_exclude(): ... assert my_dbt_assets_with_exclude.op.tags == { **op_tags, - COMPUTE_KIND_TAG: "dbt", "dagster_dbt/select": "fqn:*", "dagster_dbt/exclude": "raw_customers+", } @@ -393,7 +389,6 @@ def my_dbt_assets_with_select_and_exclude(): ... assert my_dbt_assets_with_select_and_exclude.op.tags == { **op_tags, - COMPUTE_KIND_TAG: "dbt", "dagster_dbt/select": "raw_customers+", "dagster_dbt/exclude": "customers", } @@ -605,46 +600,6 @@ def my_dbt_assets(): ... assert expected_specs_by_key[asset_key].tags["customized"] == "tag" -def test_with_storage_kind_tag_override(test_jaffle_shop_manifest: Dict[str, Any]) -> None: - expected_specs_with_no_override_by_key = { - spec.key: spec for spec in build_dbt_asset_specs(manifest=test_jaffle_shop_manifest) - } - - @dbt_assets(manifest=test_jaffle_shop_manifest) - def my_dbt_assets_no_override(): ... - - for asset_key, tags in my_dbt_assets_no_override.tags_by_key.items(): - assert tags["dagster/storage_kind"] == "duckdb" - assert ( - expected_specs_with_no_override_by_key[asset_key].tags["dagster/storage_kind"] - == "duckdb" - ) - - class CustomDagsterDbtTranslator(DagsterDbtTranslator): - def get_tags(self, _: Mapping[str, Any]) -> Mapping[str, str]: - return {**StorageKindTagSet(storage_kind="my_custom_storage_kind")} - - expected_specs_by_key = { - spec.key: spec - for spec in build_dbt_asset_specs( - manifest=test_jaffle_shop_manifest, - dagster_dbt_translator=CustomDagsterDbtTranslator(), - ) - } - - @dbt_assets( - manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator() - ) - def my_dbt_assets(): ... - - for asset_key, tags in my_dbt_assets.tags_by_key.items(): - assert tags["dagster/storage_kind"] == "my_custom_storage_kind" - assert ( - expected_specs_by_key[asset_key].tags["dagster/storage_kind"] - == "my_custom_storage_kind" - ) - - def test_with_owner_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None: expected_owners = ["custom@custom.com"] @@ -883,19 +838,20 @@ def my_dbt_assets(): ... assert expected_specs_by_key[AssetKey("customers")].tags == { "foo": "", "bar-baz": "", - "dagster/kind/dbt": "", - **StorageKindTagSet(storage_kind="duckdb"), + **build_kind_tag("duckdb"), + **build_kind_tag("dbt"), } assert my_dbt_assets.tags_by_key[AssetKey("customers")] == { "foo": "", "bar-baz": "", - **StorageKindTagSet(storage_kind="duckdb"), + **build_kind_tag("duckdb"), + **build_kind_tag("dbt"), } for asset_key in my_dbt_assets.keys - {AssetKey("customers")}: - assert my_dbt_assets.tags_by_key[asset_key] == {**StorageKindTagSet(storage_kind="duckdb")} + assert has_kind(my_dbt_assets.tags_by_key[asset_key], "duckdb") assert expected_specs_by_key[asset_key].tags == { - **StorageKindTagSet(storage_kind="duckdb"), - "dagster/kind/dbt": "", + **build_kind_tag("duckdb"), + **build_kind_tag("dbt"), } diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py index db99255f61759..f6271f0b3a093 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/asset_decorator.py @@ -8,6 +8,7 @@ multi_asset, ) from dagster._annotations import deprecated_param +from dagster._core.definitions.tags import build_kind_tag from dlt.extract.source import DltSource from dlt.pipeline.pipeline import Pipeline @@ -55,7 +56,8 @@ def build_dlt_asset_specs( }, owners=dagster_dlt_translator.get_owners(dlt_source_resource), tags={ - "dagster/storage_kind": destination_type, + **build_kind_tag("dlt"), + **build_kind_tag(destination_type), **dagster_dlt_translator.get_tags(dlt_source_resource), }, ) @@ -144,7 +146,6 @@ def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDags return multi_asset( name=name, group_name=group_name, - compute_kind="dlt", can_subset=True, partitions_def=partitions_def, specs=build_dlt_asset_specs( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 765cdc897099d..3f239e008a955 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -9,6 +9,7 @@ _check as check, multi_asset, ) +from dagster._core.definitions.tags import build_kind_tag from dagster._utils.merger import deep_merge_dicts from dagster._utils.security import non_secure_md5_hash_str @@ -118,7 +119,6 @@ def my_assets(context, sling: SlingResource): return multi_asset( name=name, - compute_kind="sling", partitions_def=partitions_def, can_subset=True, op_tags=op_tags, @@ -133,7 +133,10 @@ def my_assets(context, sling: SlingResource): METADATA_KEY_TRANSLATOR: dagster_sling_translator, METADATA_KEY_REPLICATION_CONFIG: replication_config, }, - tags=dagster_sling_translator.get_tags(stream), + tags={ + **build_kind_tag("sling"), + **(dagster_sling_translator.get_tags(stream) or {}), + }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py index 303ad0a15d6d2..ce336e9095739 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py @@ -15,6 +15,7 @@ from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.metadata_value import TableSchemaMetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema +from dagster._core.definitions.tags import build_kind_tag, has_kind from dagster_embedded_elt.dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets from dlt import Pipeline from dlt.extract.resource import DltResource @@ -232,10 +233,7 @@ def example_pipeline_assets( for key in example_pipeline_assets.asset_and_check_keys: if isinstance(key, AssetKey): - assert ( - example_pipeline_assets.tags_by_key[key].get("dagster/storage_kind") - == destination_type - ) + assert has_kind(example_pipeline_assets.tags_by_key[key], destination_type) def test_example_pipeline_subselection(dlt_pipeline: Pipeline) -> None: @@ -498,7 +496,7 @@ def my_dlt_assets(dlt_pipeline_resource: DagsterDltResource): ... def test_with_tag_replacements(dlt_pipeline: Pipeline) -> None: - expected_tags = {"customized": "tag", "dagster/storage_kind": "duckdb"} + expected_tags = {"customized": "tag", **build_kind_tag("dlt"), **build_kind_tag("duckdb")} class CustomDagsterDltTranslator(DagsterDltTranslator): def get_tags(self, _) -> Optional[Mapping[str, str]]: diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/sling_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/sling_tests/test_asset_decorator.py index 7f6545515c180..6acf2d049546f 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/sling_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/sling_tests/test_asset_decorator.py @@ -13,6 +13,7 @@ file_relative_path, ) from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.tags import build_kind_tag from dagster_embedded_elt.sling import SlingReplicationParam, sling_assets from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource @@ -232,7 +233,10 @@ def get_tags(self, stream_definition): def my_sling_assets(): ... for asset_key in my_sling_assets.keys: - assert my_sling_assets.tags_by_key[asset_key] == {"custom_tag": "custom_value"} + assert my_sling_assets.tags_by_key[asset_key] == { + "custom_tag": "custom_value", + **build_kind_tag("sling"), + } def test_base_with_default_meta_translator(): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 67341d5d83153..e875ca13525db 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -31,6 +31,7 @@ from dagster._core.definitions.events import CoercibleToAssetKeyPrefix, Output from dagster._core.definitions.metadata import RawMetadataMapping from dagster._core.definitions.resource_definition import ResourceDefinition +from dagster._core.definitions.tags import build_kind_tag from dagster._core.errors import DagsterStepOutputNotFoundError from dagster._core.execution.context.init import build_init_resource_context @@ -74,7 +75,6 @@ def _build_fivetran_assets( @multi_asset( name=f"fivetran_sync_{connector_id}", - compute_kind="fivetran", resource_defs=resource_defs, group_name=group_name, op_tags=op_tags, @@ -85,7 +85,10 @@ def _build_fivetran_assets( **_metadata_by_table_name.get(table, {}), **({"dagster/io_manager_key": io_manager_key} if io_manager_key else {}), }, - tags=asset_tags, + tags={ + **build_kind_tag("fivetran"), + **(asset_tags or {}), + }, ) for table in tracked_asset_keys.keys() ], @@ -323,7 +326,7 @@ def _build_fivetran_assets_from_metadata( group_name=assets_defn_meta.group_name, poll_interval=poll_interval, poll_timeout=poll_timeout, - asset_tags={"dagster/storage_kind": storage_kind} if storage_kind else None, + asset_tags=build_kind_tag(storage_kind) if storage_kind else None, infer_missing_tables=False, op_tags=None, )[0] diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py index a1f4d1759b0fd..ed826ee6742a8 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py @@ -2,7 +2,6 @@ import responses from dagster import AssetKey, DagsterStepOutputNotFoundError from dagster._core.definitions.materialize import materialize -from dagster._core.storage.tags import COMPUTE_KIND_TAG from dagster_fivetran import fivetran_resource from dagster_fivetran.asset_defs import build_fivetran_assets from dagster_fivetran.resources import ( @@ -78,7 +77,7 @@ def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_p assert fivetran_assets[0].keys == {AssetKey(table.split(".")) for table in tables} assert len(fivetran_assets[0].op.output_defs) == len(tables) - assert fivetran_assets[0].op.tags == {**{COMPUTE_KIND_TAG: "fivetran"}, **(op_tags or {})} + assert fivetran_assets[0].op.tags == (op_tags or {}) with responses.RequestsMock() as rsps: rsps.add(rsps.PATCH, api_prefix, json=get_sample_update_response()) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index 40cd51af071fd..c547cfc3dc4f0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -15,6 +15,7 @@ ) from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.table import TableColumn, TableSchema +from dagster._core.definitions.tags import has_kind from dagster._core.execution.with_resources import with_resources from dagster._core.instance_for_test import environ from dagster_fivetran import FivetranResource @@ -215,7 +216,7 @@ def downstream_asset(xyz): assert metadata.get("dagster/relation_identifier") == ( "example_database." + ".".join(key.path[-2:]) ) - assert assets_def.tags_by_key[key]["dagster/storage_kind"] == "snowflake" + assert has_kind(assets_def.tags_by_key[key], "snowflake") assert ft_assets[0].keys == tables assert all( diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_decorator.py b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_decorator.py index 7f164c1fef636..e6f40f01c14aa 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_decorator.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_decorator.py @@ -49,7 +49,6 @@ def sdf_assets( specs=specs, name=name, required_resource_keys=required_resource_keys, - compute_kind="sdf", partitions_def=partitions_def, can_subset=True, op_tags=op_tags, diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py index 426ffe3aa335d..8f7e26ff83761 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py @@ -33,6 +33,7 @@ TableMetadataSet, TableSchema, ) +from dagster._core.definitions.tags import build_kind_tag from dagster._record import IHaveNew, record_custom from dagster_sdf.asset_utils import exists_in_selected, get_info_schema_dir, get_output_dir @@ -217,6 +218,9 @@ def build_sdf_multi_asset_args( get_output_dir(self.target_dir, self.environment), ), metadata=metadata, + tags={ + **build_kind_tag("sdf"), + }, skippable=True, ) )