Skip to content

Commit

Permalink
Introduce notion of hidden definitions tags
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 21, 2024
1 parent 2616f6d commit 57b0401
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
PARTITION_SET_TAG,
REPOSITORY_LABEL_TAG,
TagType,
get_tag_type,
get_run_tag_type,
)
from dagster._utils.yaml_utils import dump_run_config_yaml

Expand Down Expand Up @@ -159,7 +159,7 @@ def get_partition_tags(
results=[
GraphenePipelineTag(key=key, value=value)
for key, value in result.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_run_tag_type
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_timestamp

Expand Down Expand Up @@ -66,7 +66,7 @@ def get_run_tag_keys(graphene_info: "ResolveInfo") -> "GrapheneRunTagKeys":
keys=[
tag_key
for tag_key in graphene_info.context.instance.get_run_tag_keys()
if get_tag_type(tag_key) != TagType.HIDDEN
if get_run_tag_type(tag_key) != TagType.HIDDEN
]
)

Expand All @@ -87,7 +87,7 @@ def get_run_tags(
for key, values in instance.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from dagster._core.snap.node import GraphDefSnap, OpDefSnap
from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader
from dagster._core.storage.tags import TagType, get_definition_tag_type
from dagster._core.utils import is_valid_email
from dagster._core.workspace.permissions import Permissions
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
Expand Down Expand Up @@ -1209,6 +1210,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo) -> Sequence[GrapheneDefiniti
return [
GrapheneDefinitionTag(key, value)
for key, value in (self._external_asset_node.tags or {}).items()
if get_definition_tag_type(key) == TagType.USER_PROVIDED
]

def resolve_op(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
TagType,
get_tag_type,
get_run_tag_type,
)
from dagster._core.workspace.permissions import Permissions

Expand Down Expand Up @@ -508,7 +508,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._backfill_job.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
SensorInstigatorData,
)
from dagster._core.storage.dagster_run import DagsterRun, RunsFilter
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster._utils.yaml_utils import dump_run_config_yaml
Expand Down Expand Up @@ -499,7 +499,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._run_request.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_runConfigYaml(self, _graphene_info: ResolveInfo):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
RunRecord,
RunsFilter,
)
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.yaml_utils import dump_run_config_yaml

Expand Down Expand Up @@ -518,7 +518,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self.dagster_run.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_rootRunId(self, _graphene_info: ResolveInfo):
Expand Down Expand Up @@ -877,7 +877,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._active_preset_data.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,10 @@ def fresh_diamond_right(fresh_diamond_top):
@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=30),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
tags={
"dagster/foo": "asdf",
".dagster/bar": "qwer",
},
)
def fresh_diamond_bottom(fresh_diamond_left, fresh_diamond_right):
return fresh_diamond_left + fresh_diamond_right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,22 @@
}
"""

GET_TAGS = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
...on AssetNode {
assetKey {
path
}
tags {
key
value
}
}
}
}
"""

GET_TARGETING_INSTIGATORS = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
Expand Down Expand Up @@ -2560,6 +2576,18 @@ def test_auto_materialize_policy(self, graphql_context: WorkspaceRequestContext)
assert len(fresh_diamond_bottom) == 1
assert fresh_diamond_bottom[0]["autoMaterializePolicy"]["policyType"] == "LAZY"

def test_tags(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(
graphql_context,
GET_TAGS,
variables={
"assetKey": {"path": ["fresh_diamond_bottom"]},
},
)

fresh_diamond_bottom = result.data["assetNodeOrError"]
assert fresh_diamond_bottom["tags"] == [{"key": "dagster/foo", "value": "asdf"}]

def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(graphql_context, HAS_ASSET_CHECKS)

Expand Down
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class TagType(Enum):
HIDDEN = "HIDDEN"


def get_tag_type(tag):
def get_run_tag_type(tag: str) -> TagType:
"""Get the tag type for a run tag."""
if tag.startswith(SYSTEM_TAG_PREFIX):
return TagType.SYSTEM
elif tag.startswith(HIDDEN_TAG_PREFIX):
Expand All @@ -110,6 +111,17 @@ def get_tag_type(tag):
return TagType.USER_PROVIDED


def get_definition_tag_type(tag: str) -> TagType:
"""Get the tag type for a definition tag, such as an asset tag."""
# Though some definition tags may use the dagster/ prefix, in many cases they are
# user-provided and should be treated as such. The namespace here is used for standard
# tags rather than a system indicator.
if tag.startswith(HIDDEN_TAG_PREFIX):
return TagType.HIDDEN
else:
return TagType.USER_PROVIDED


def check_reserved_tags(tags):
check.opt_dict_param(tags, "tags", key_type=str, value_type=str)

Expand Down

0 comments on commit 57b0401

Please sign in to comment.