Skip to content

Commit

Permalink
move to ext asset
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 21, 2024
1 parent fd9f97a commit ffce823
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from dagster._core.snap.node import GraphDefSnap, OpDefSnap
from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader
from dagster._core.storage.tags import KIND_PREFIX, TagType, get_definition_tag_type
from dagster._core.utils import is_valid_email
from dagster._core.workspace.permissions import Permissions
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME
from dagster._core.selector.subset_selector import generate_asset_dep_graph
from dagster._core.storage.tags import KIND_PREFIX
from dagster._utils.warnings import disable_dagster_warnings


Expand Down Expand Up @@ -78,10 +77,11 @@ def metadata(self) -> ArbitraryMetadataMapping:

@property
def tags(self) -> Mapping[str, str]:
return {
**self._spec.tags,
**{f"{KIND_PREFIX}{kind}": "true" for kind in self._spec.kinds or []},
}
return self._spec.tags

@property
def kinds(self) -> AbstractSet[str]:
return self._spec.kinds or set()

@property
def owners(self) -> Sequence[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
from dagster._core.snap import JobSnapshot
from dagster._core.snap.mode import ResourceDefSnap, build_resource_def_snap
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._core.storage.tags import COMPUTE_KIND_TAG, KIND_PREFIX
from dagster._core.utils import is_valid_email
from dagster._record import IHaveNew, record, record_custom
from dagster._serdes import whitelist_for_serdes
Expand Down Expand Up @@ -1448,6 +1448,10 @@ def external_asset_nodes_from_defs(
):
partition_mappings[pk] = partition_mapping

tags_with_kinds = {
**asset_node.tags,
**{f"{KIND_PREFIX}{kind}": "" for kind in asset_node.kinds or []},
}
external_asset_nodes.append(
ExternalAssetNode(
asset_key=key,
Expand Down Expand Up @@ -1477,7 +1481,7 @@ def external_asset_nodes_from_defs(
),
output_name=output_name,
metadata=asset_node.metadata,
tags=asset_node.tags,
tags=tags_with_kinds,
group_name=asset_node.group_name,
freshness_policy=asset_node.freshness_policy,
is_source=asset_node.is_external,
Expand Down

0 comments on commit ffce823

Please sign in to comment.