Skip to content

Commit

Permalink
Introduce notion of hidden definitions tags
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 20, 2024
1 parent d581365 commit f1492cb
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
customer_id,first_name,last_name,first_order,most_recent_order,number_of_orders,customer_lifetime_value
1,Michael,P.,2018-01-01,2018-02-10,2.0,33.0
2,Shawn,M.,2018-01-11,2018-01-11,1.0,23.0
3,Kathleen,P.,2018-01-02,2018-03-11,3.0,65.0
6,Sarah,R.,2018-02-19,2018-02-19,1.0,8.0
7,Martin,M.,2018-01-14,2018-01-14,1.0,26.0
8,Frank,R.,2018-01-29,2018-03-12,2.0,45.0
9,Jennifer,F.,2018-03-17,2018-03-17,1.0,30.0
11,Fred,S.,2018-03-23,2018-03-23,1.0,3.0
12,Amy,D.,2018-03-03,2018-03-03,1.0,4.0
13,Kathleen,M.,2018-03-07,2018-03-07,1.0,26.0
16,Amanda,H.,2018-02-02,2018-02-02,1.0,12.0
18,Johnny,K.,2018-02-27,2018-02-27,1.0,29.0
19,Virginia,F.,2018-03-16,2018-03-16,1.0,3.0
20,Anna,A.,2018-01-23,2018-01-23,1.0,15.0
21,Willie,H.,2018-03-28,2018-03-28,1.0,22.0
22,Sean,H.,2018-01-26,2018-03-01,3.0,52.0
25,Victor,H.,2018-01-17,2018-03-20,2.0,24.0
26,Aaron,R.,2018-02-11,2018-03-08,2.0,8.0
27,Benjamin,B.,2018-02-21,2018-04-04,2.0,27.0
28,Lisa,W.,2018-02-04,2018-02-04,1.0,3.0
30,Christina,W.,2018-03-02,2018-03-14,2.0,57.0
31,Jane,G.,2018-02-17,2018-02-17,1.0,18.0
32,Thomas,O.,2018-01-28,2018-01-28,1.0,30.0
33,Katherine,M.,2018-02-13,2018-02-13,1.0,14.0
34,Jennifer,S.,2018-02-26,2018-02-26,1.0,3.0
35,Sara,T.,2018-02-21,2018-03-21,2.0,34.0
36,Harold,O.,2018-03-10,2018-03-10,1.0,28.0
38,Dennis,J.,2018-02-06,2018-02-06,1.0,15.0
39,Louise,W.,2018-01-18,2018-01-18,1.0,10.0
40,Maria,A.,2018-01-17,2018-01-17,1.0,3.0
41,Gloria,C.,2018-04-07,2018-04-07,1.0,10.0
42,Diana,S.,2018-02-04,2018-03-12,2.0,27.0
46,Norma,C.,2018-03-24,2018-03-27,2.0,64.0
47,Marie,P.,2018-03-26,2018-03-31,2.0,36.0
51,Howard,R.,2018-01-28,2018-02-23,3.0,99.0
52,Laura,F.,2018-03-23,2018-03-23,1.0,27.0
53,Anne,B.,2018-01-12,2018-03-11,2.0,39.0
54,Rose,M.,2018-01-07,2018-03-24,5.0,57.0
57,Paul,W.,2018-01-31,2018-03-05,2.0,26.0
59,Adam,A.,2018-01-15,2018-01-15,1.0,1.0
63,Edward,G.,2018-03-03,2018-04-03,2.0,23.0
64,David,C.,2018-01-05,2018-01-20,2.0,30.0
66,Adam,W.,2018-02-17,2018-04-03,3.0,39.0
68,Jesse,E.,2018-03-26,2018-03-26,1.0,23.0
69,Janet,P.,2018-02-02,2018-03-18,2.0,32.0
70,Helen,F.,2018-03-06,2018-03-26,2.0,54.0
71,Gerald,C.,2018-01-18,2018-02-24,3.0,44.0
76,Barbara,W.,2018-03-23,2018-03-23,1.0,2.0
79,Jack,R.,2018-02-28,2018-03-11,2.0,27.0
80,Phillip,H.,2018-02-08,2018-02-08,1.0,29.0
84,Christina,R.,2018-01-17,2018-04-02,2.0,36.0
85,Theresa,M.,2018-02-10,2018-04-09,2.0,33.0
86,Jason,C.,2018-01-24,2018-01-24,1.0,8.0
88,Adam,T.,2018-01-09,2018-01-09,1.0,16.0
89,Margaret,J.,2018-04-07,2018-04-07,1.0,14.0
90,Paul,P.,2018-03-23,2018-04-06,2.0,43.0
91,Todd,W.,2018-03-27,2018-03-27,1.0,29.0
92,Willie,O.,2018-02-16,2018-02-16,1.0,17.0
93,Frances,R.,2018-03-01,2018-03-01,1.0,2.0
94,Gregory,H.,2018-01-04,2018-01-29,2.0,24.0
99,Mary,G.,2018-01-14,2018-02-14,2.0,44.0
50,Billy,L.,2018-01-05,2018-02-20,2.0,47.0
4,Jimmy,C.,,,,
5,Katherine,R.,,,,
10,Henry,W.,,,,
14,Steve,F.,,,,
15,Teresa,H.,,,,
17,Kimberly,R.,,,,
23,Mildred,A.,,,,
24,David,G.,,,,
29,Benjamin,K.,,,,
37,Shirley,J.,,,,
43,Kelly,N.,,,,
44,Jane,R.,,,,
45,Scott,B.,,,,
48,Lillian,C.,,,,
49,Judy,N.,,,,
55,Nicholas,R.,,,,
56,Joshua,K.,,,,
58,Kathryn,K.,,,,
60,Norma,W.,,,,
61,Timothy,R.,,,,
62,Elizabeth,P.,,,,
65,Brenda,W.,,,,
67,Michael,H.,,,,
72,Kathryn,O.,,,,
73,Alan,B.,,,,
74,Harry,A.,,,,
75,Andrea,H.,,,,
77,Anne,W.,,,,
78,Harry,H.,,,,
81,Shirley,H.,,,,
82,Arthur,D.,,,,
83,Virginia,R.,,,,
87,Phillip,B.,,,,
95,Lisa,P.,,,,
96,Jacqueline,A.,,,,
97,Shirley,D.,,,,
98,Nicole,M.,,,,
100,Jean,M.,,,,
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
PARTITION_SET_TAG,
REPOSITORY_LABEL_TAG,
TagType,
get_tag_type,
get_run_tag_type,
)
from dagster._utils.yaml_utils import dump_run_config_yaml

Expand Down Expand Up @@ -153,7 +153,7 @@ def get_partition_tags(
results=[
GraphenePipelineTag(key=key, value=value)
for key, value in result.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_run_tag_type
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_timestamp

Expand Down Expand Up @@ -66,7 +66,7 @@ def get_run_tag_keys(graphene_info: "ResolveInfo") -> "GrapheneRunTagKeys":
keys=[
tag_key
for tag_key in graphene_info.context.instance.get_run_tag_keys()
if get_tag_type(tag_key) != TagType.HIDDEN
if get_run_tag_type(tag_key) != TagType.HIDDEN
]
)

Expand All @@ -87,7 +87,7 @@ def get_run_tags(
for key, values in instance.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from dagster._core.snap.node import GraphDefSnap, OpDefSnap
from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader
from dagster._core.storage.tags import TagType, get_definition_tag_type
from dagster._core.utils import is_valid_email
from dagster._core.workspace.permissions import Permissions
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
Expand Down Expand Up @@ -1209,6 +1210,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo) -> Sequence[GrapheneDefiniti
return [
GrapheneDefinitionTag(key, value)
for key, value in (self._external_asset_node.tags or {}).items()
if get_definition_tag_type(key) == TagType.USER_PROVIDED
]

def resolve_op(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
TagType,
get_tag_type,
get_run_tag_type,
)
from dagster._core.workspace.permissions import Permissions

Expand Down Expand Up @@ -508,7 +508,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._backfill_job.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
SensorInstigatorData,
)
from dagster._core.storage.dagster_run import DagsterRun, RunsFilter
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster._utils.yaml_utils import dump_run_config_yaml
Expand Down Expand Up @@ -499,7 +499,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._run_request.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_runConfigYaml(self, _graphene_info: ResolveInfo):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
RunRecord,
RunsFilter,
)
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.yaml_utils import dump_run_config_yaml

Expand Down Expand Up @@ -502,7 +502,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self.dagster_run.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]

def resolve_rootRunId(self, _graphene_info: ResolveInfo):
Expand Down Expand Up @@ -861,7 +861,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self._active_preset_data.tags.items()
if get_tag_type(key) != TagType.HIDDEN
if get_run_tag_type(key) != TagType.HIDDEN
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,10 @@ def fresh_diamond_right(fresh_diamond_top):
@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=30),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
tags={
"dagster/foo": "asdf",
".dagster/bar": "qwer",
},
)
def fresh_diamond_bottom(fresh_diamond_left, fresh_diamond_right):
return fresh_diamond_left + fresh_diamond_right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,22 @@
}
"""

GET_TAGS = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
...on AssetNode {
assetKey {
path
}
tags {
key
value
}
}
}
}
"""

GET_TARGETING_INSTIGATORS = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
Expand Down Expand Up @@ -2560,6 +2576,18 @@ def test_auto_materialize_policy(self, graphql_context: WorkspaceRequestContext)
assert len(fresh_diamond_bottom) == 1
assert fresh_diamond_bottom[0]["autoMaterializePolicy"]["policyType"] == "LAZY"

def test_tags(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(
graphql_context,
GET_TAGS,
variables={
"assetKey": {"path": ["fresh_diamond_bottom"]},
},
)

fresh_diamond_bottom = result.data["assetNodeOrError"]
assert fresh_diamond_bottom["tags"] == [{"key": "dagster/foo", "value": "asdf"}]

def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(graphql_context, HAS_ASSET_CHECKS)

Expand Down
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class TagType(Enum):
HIDDEN = "HIDDEN"


def get_tag_type(tag):
def get_run_tag_type(tag: str) -> TagType:
"""Get the tag type for a run tag."""
if tag.startswith(SYSTEM_TAG_PREFIX):
return TagType.SYSTEM
elif tag.startswith(HIDDEN_TAG_PREFIX):
Expand All @@ -110,6 +111,17 @@ def get_tag_type(tag):
return TagType.USER_PROVIDED


def get_definition_tag_type(tag: str) -> TagType:
"""Get the tag type for a definition tag, such as an asset tag."""
# Though some definition tags may use the dagster/ prefix, in many cases they are
# user-provided and should be treated as such. The namespace here is used for standard
# tags rather than a system indicator.
if tag.startswith(HIDDEN_TAG_PREFIX):
return TagType.HIDDEN
else:
return TagType.USER_PROVIDED


def check_reserved_tags(tags):
check.opt_dict_param(tags, "tags", key_type=str, value_type=str)

Expand Down

0 comments on commit f1492cb

Please sign in to comment.