Skip to content

Commit

Permalink
Update integrations to use kind tags in place of storage kind (#24324)
Browse files Browse the repository at this point in the history
## Summary

Updates the Fivetran, dbt, sdf, and embedded-elt integrations to use
`dagster/kind/xyz` tags in place of compute kind. In particular, brings
back kinds for both storage & compute in the UI.

## Test Plan

Updated unit tests.
  • Loading branch information
benpankow authored Sep 11, 2024
1 parent 54a8201 commit 92d4cc8
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ def __new__(
validate_asset_owner(owner, key)

kind_tags = {tag_key for tag_key in (tags or {}).keys() if tag_key.startswith(KIND_PREFIX)}
if kind_tags is not None and len(kind_tags) > 2:
raise DagsterInvalidDefinitionError("Assets can have at most two kinds currently.")
if kind_tags is not None and len(kind_tags) > 3:
raise DagsterInvalidDefinitionError("Assets can have at most three kinds currently.")

return super().__new__(
cls,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
from typing import Optional
from typing import Any, Dict, Mapping, Optional

from dagster._annotations import deprecated
from dagster._core.definitions.tags.tag_set import NamespacedTagSet as NamespacedTagSet
from dagster._core.storage.tags import KIND_PREFIX


def has_kind(tags: Mapping[str, Any], kind: str) -> bool:
return build_kind_tag_key(kind) in tags


def build_kind_tag_key(kind: str) -> str:
return f"{KIND_PREFIX}{kind}"


def build_kind_tag(kind: str) -> Dict[str, Any]:
return {build_kind_tag_key(kind): ""}


@deprecated(breaking_version="1.9")
class StorageKindTagSet(NamespacedTagSet):
"""Tag entries which describe how an asset is stored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ def assets(): ...
assert assets.specs_by_key[AssetKey("asset1")].kinds == {"python"}

with pytest.raises(
DagsterInvalidDefinitionError, match="Assets can have at most two kinds currently."
DagsterInvalidDefinitionError, match="Assets can have at most three kinds currently."
):

@multi_asset(
Expand All @@ -2308,6 +2308,7 @@ def assets(): ...
"dagster/kind/python": "",
"dagster/kind/snowflake": "",
"dagster/kind/bigquery": "",
"dagster/kind/airflow": "",
},
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
internal_asset_deps=internal_asset_deps,
deps=deps,
required_resource_keys=required_resource_keys,
compute_kind="dbt",
partitions_def=partitions_def,
can_subset=True,
op_tags=resolved_op_tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
CodeReferencesMetadataValue,
LocalFileCodeReference,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster._utils.merger import merge_dicts

from dagster_dbt.utils import (
Expand Down Expand Up @@ -816,7 +816,8 @@ def build_dbt_multi_asset_args(
}
),
tags={
**(StorageKindTagSet(storage_kind=dbt_adapter_type) if dbt_adapter_type else {}),
**build_kind_tag("dbt"),
**(build_kind_tag(dbt_adapter_type) if dbt_adapter_type else {}),
**dagster_dbt_translator.get_tags(dbt_resource_props),
},
group_name=dagster_dbt_translator.get_group_name(dbt_resource_props),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
asset,
materialize,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag, has_kind
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._core.types.dagster_type import DagsterType
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.asset_specs import build_dbt_asset_specs
Expand Down Expand Up @@ -360,7 +359,6 @@ def my_dbt_assets(): ...

assert my_dbt_assets.op.tags == {
**op_tags,
COMPUTE_KIND_TAG: "dbt",
"dagster_dbt/select": "fqn:*",
}

Expand All @@ -369,7 +367,6 @@ def my_dbt_assets_with_select(): ...

assert my_dbt_assets_with_select.op.tags == {
**op_tags,
COMPUTE_KIND_TAG: "dbt",
"dagster_dbt/select": "raw_customers+",
}

Expand All @@ -378,7 +375,6 @@ def my_dbt_assets_with_exclude(): ...

assert my_dbt_assets_with_exclude.op.tags == {
**op_tags,
COMPUTE_KIND_TAG: "dbt",
"dagster_dbt/select": "fqn:*",
"dagster_dbt/exclude": "raw_customers+",
}
Expand All @@ -393,7 +389,6 @@ def my_dbt_assets_with_select_and_exclude(): ...

assert my_dbt_assets_with_select_and_exclude.op.tags == {
**op_tags,
COMPUTE_KIND_TAG: "dbt",
"dagster_dbt/select": "raw_customers+",
"dagster_dbt/exclude": "customers",
}
Expand Down Expand Up @@ -605,46 +600,6 @@ def my_dbt_assets(): ...
assert expected_specs_by_key[asset_key].tags["customized"] == "tag"


def test_with_storage_kind_tag_override(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
expected_specs_with_no_override_by_key = {
spec.key: spec for spec in build_dbt_asset_specs(manifest=test_jaffle_shop_manifest)
}

@dbt_assets(manifest=test_jaffle_shop_manifest)
def my_dbt_assets_no_override(): ...

for asset_key, tags in my_dbt_assets_no_override.tags_by_key.items():
assert tags["dagster/storage_kind"] == "duckdb"
assert (
expected_specs_with_no_override_by_key[asset_key].tags["dagster/storage_kind"]
== "duckdb"
)

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_tags(self, _: Mapping[str, Any]) -> Mapping[str, str]:
return {**StorageKindTagSet(storage_kind="my_custom_storage_kind")}

expected_specs_by_key = {
spec.key: spec
for spec in build_dbt_asset_specs(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
}

@dbt_assets(
manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()
)
def my_dbt_assets(): ...

for asset_key, tags in my_dbt_assets.tags_by_key.items():
assert tags["dagster/storage_kind"] == "my_custom_storage_kind"
assert (
expected_specs_by_key[asset_key].tags["dagster/storage_kind"]
== "my_custom_storage_kind"
)


def test_with_owner_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
expected_owners = ["[email protected]"]

Expand Down Expand Up @@ -883,19 +838,20 @@ def my_dbt_assets(): ...
assert expected_specs_by_key[AssetKey("customers")].tags == {
"foo": "",
"bar-baz": "",
"dagster/kind/dbt": "",
**StorageKindTagSet(storage_kind="duckdb"),
**build_kind_tag("duckdb"),
**build_kind_tag("dbt"),
}
assert my_dbt_assets.tags_by_key[AssetKey("customers")] == {
"foo": "",
"bar-baz": "",
**StorageKindTagSet(storage_kind="duckdb"),
**build_kind_tag("duckdb"),
**build_kind_tag("dbt"),
}
for asset_key in my_dbt_assets.keys - {AssetKey("customers")}:
assert my_dbt_assets.tags_by_key[asset_key] == {**StorageKindTagSet(storage_kind="duckdb")}
assert has_kind(my_dbt_assets.tags_by_key[asset_key], "duckdb")
assert expected_specs_by_key[asset_key].tags == {
**StorageKindTagSet(storage_kind="duckdb"),
"dagster/kind/dbt": "",
**build_kind_tag("duckdb"),
**build_kind_tag("dbt"),
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
multi_asset,
)
from dagster._annotations import deprecated_param
from dagster._core.definitions.tags import build_kind_tag
from dlt.extract.source import DltSource
from dlt.pipeline.pipeline import Pipeline

Expand Down Expand Up @@ -55,7 +56,8 @@ def build_dlt_asset_specs(
},
owners=dagster_dlt_translator.get_owners(dlt_source_resource),
tags={
"dagster/storage_kind": destination_type,
**build_kind_tag("dlt"),
**build_kind_tag(destination_type),
**dagster_dlt_translator.get_tags(dlt_source_resource),
},
)
Expand Down Expand Up @@ -144,7 +146,6 @@ def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDags
return multi_asset(
name=name,
group_name=group_name,
compute_kind="dlt",
can_subset=True,
partitions_def=partitions_def,
specs=build_dlt_asset_specs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
_check as check,
multi_asset,
)
from dagster._core.definitions.tags import build_kind_tag
from dagster._utils.merger import deep_merge_dicts
from dagster._utils.security import non_secure_md5_hash_str

Expand Down Expand Up @@ -118,7 +119,6 @@ def my_assets(context, sling: SlingResource):

return multi_asset(
name=name,
compute_kind="sling",
partitions_def=partitions_def,
can_subset=True,
op_tags=op_tags,
Expand All @@ -133,7 +133,10 @@ def my_assets(context, sling: SlingResource):
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
METADATA_KEY_REPLICATION_CONFIG: replication_config,
},
tags=dagster_sling_translator.get_tags(stream),
tags={
**build_kind_tag("sling"),
**(dagster_sling_translator.get_tags(stream) or {}),
},
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.metadata_value import TableSchemaMetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import build_kind_tag, has_kind
from dagster_embedded_elt.dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets
from dlt import Pipeline
from dlt.extract.resource import DltResource
Expand Down Expand Up @@ -232,10 +233,7 @@ def example_pipeline_assets(

for key in example_pipeline_assets.asset_and_check_keys:
if isinstance(key, AssetKey):
assert (
example_pipeline_assets.tags_by_key[key].get("dagster/storage_kind")
== destination_type
)
assert has_kind(example_pipeline_assets.tags_by_key[key], destination_type)


def test_example_pipeline_subselection(dlt_pipeline: Pipeline) -> None:
Expand Down Expand Up @@ -498,7 +496,7 @@ def my_dlt_assets(dlt_pipeline_resource: DagsterDltResource): ...


def test_with_tag_replacements(dlt_pipeline: Pipeline) -> None:
expected_tags = {"customized": "tag", "dagster/storage_kind": "duckdb"}
expected_tags = {"customized": "tag", **build_kind_tag("dlt"), **build_kind_tag("duckdb")}

class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_tags(self, _) -> Optional[Mapping[str, str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
file_relative_path,
)
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.tags import build_kind_tag
from dagster_embedded_elt.sling import SlingReplicationParam, sling_assets
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource
Expand Down Expand Up @@ -232,7 +233,10 @@ def get_tags(self, stream_definition):
def my_sling_assets(): ...

for asset_key in my_sling_assets.keys:
assert my_sling_assets.tags_by_key[asset_key] == {"custom_tag": "custom_value"}
assert my_sling_assets.tags_by_key[asset_key] == {
"custom_tag": "custom_value",
**build_kind_tag("sling"),
}


def test_base_with_default_meta_translator():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dagster._core.definitions.events import CoercibleToAssetKeyPrefix, Output
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.tags import build_kind_tag
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.execution.context.init import build_init_resource_context

Expand Down Expand Up @@ -74,7 +75,6 @@ def _build_fivetran_assets(

@multi_asset(
name=f"fivetran_sync_{connector_id}",
compute_kind="fivetran",
resource_defs=resource_defs,
group_name=group_name,
op_tags=op_tags,
Expand All @@ -85,7 +85,10 @@ def _build_fivetran_assets(
**_metadata_by_table_name.get(table, {}),
**({"dagster/io_manager_key": io_manager_key} if io_manager_key else {}),
},
tags=asset_tags,
tags={
**build_kind_tag("fivetran"),
**(asset_tags or {}),
},
)
for table in tracked_asset_keys.keys()
],
Expand Down Expand Up @@ -323,7 +326,7 @@ def _build_fivetran_assets_from_metadata(
group_name=assets_defn_meta.group_name,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
asset_tags={"dagster/storage_kind": storage_kind} if storage_kind else None,
asset_tags=build_kind_tag(storage_kind) if storage_kind else None,
infer_missing_tables=False,
op_tags=None,
)[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import responses
from dagster import AssetKey, DagsterStepOutputNotFoundError
from dagster._core.definitions.materialize import materialize
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster_fivetran import fivetran_resource
from dagster_fivetran.asset_defs import build_fivetran_assets
from dagster_fivetran.resources import (
Expand Down Expand Up @@ -78,7 +77,7 @@ def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_p
assert fivetran_assets[0].keys == {AssetKey(table.split(".")) for table in tables}
assert len(fivetran_assets[0].op.output_defs) == len(tables)

assert fivetran_assets[0].op.tags == {**{COMPUTE_KIND_TAG: "fivetran"}, **(op_tags or {})}
assert fivetran_assets[0].op.tags == (op_tags or {})

with responses.RequestsMock() as rsps:
rsps.add(rsps.PATCH, api_prefix, json=get_sample_update_response())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import has_kind
from dagster._core.execution.with_resources import with_resources
from dagster._core.instance_for_test import environ
from dagster_fivetran import FivetranResource
Expand Down Expand Up @@ -215,7 +216,7 @@ def downstream_asset(xyz):
assert metadata.get("dagster/relation_identifier") == (
"example_database." + ".".join(key.path[-2:])
)
assert assets_def.tags_by_key[key]["dagster/storage_kind"] == "snowflake"
assert has_kind(assets_def.tags_by_key[key], "snowflake")

assert ft_assets[0].keys == tables
assert all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def sdf_assets(
specs=specs,
name=name,
required_resource_keys=required_resource_keys,
compute_kind="sdf",
partitions_def=partitions_def,
can_subset=True,
op_tags=op_tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TableMetadataSet,
TableSchema,
)
from dagster._core.definitions.tags import build_kind_tag
from dagster._record import IHaveNew, record_custom

from dagster_sdf.asset_utils import exists_in_selected, get_info_schema_dir, get_output_dir
Expand Down Expand Up @@ -217,6 +218,9 @@ def build_sdf_multi_asset_args(
get_output_dir(self.target_dir, self.environment),
),
metadata=metadata,
tags={
**build_kind_tag("sdf"),
},
skippable=True,
)
)
Expand Down

0 comments on commit 92d4cc8

Please sign in to comment.