From 50ffeee0695366c654c8bf985eb92357c9122cfa Mon Sep 17 00:00:00 2001 From: benpankow Date: Tue, 20 Aug 2024 13:45:05 -0700 Subject: [PATCH] Introduce notion of hidden definitions tags --- .../implementation/fetch_partition_sets.py | 4 +-- .../implementation/fetch_runs.py | 6 ++-- .../dagster_graphql/schema/asset_graph.py | 2 ++ .../dagster_graphql/schema/backfill.py | 4 +-- .../dagster_graphql/schema/instigation.py | 4 +-- .../schema/pipelines/pipeline.py | 6 ++-- .../dagster_graphql_tests/graphql/repo.py | 4 +++ .../graphql/test_assets.py | 28 +++++++++++++++++++ .../dagster/dagster/_core/storage/tags.py | 14 +++++++++- 9 files changed, 59 insertions(+), 13 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py index 8f60467179dee..dfe5cdfaad554 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py @@ -12,7 +12,7 @@ PARTITION_SET_TAG, REPOSITORY_LABEL_TAG, TagType, - get_tag_type, + get_run_tag_type, ) from dagster._utils.yaml_utils import dump_run_config_yaml @@ -153,7 +153,7 @@ def get_partition_tags( results=[ GraphenePipelineTag(key=key, value=value) for key, value in result.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] ) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 3e553dc5c617d..184a3126f1aca 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -22,7 +22,7 @@ from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord -from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type +from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_run_tag_type from dagster._record import record from dagster._time import datetime_from_timestamp, get_current_timestamp @@ -66,7 +66,7 @@ def get_run_tag_keys(graphene_info: "ResolveInfo") -> "GrapheneRunTagKeys": keys=[ tag_key for tag_key in graphene_info.context.instance.get_run_tag_keys() - if get_tag_type(tag_key) != TagType.HIDDEN + if get_run_tag_type(tag_key) != TagType.HIDDEN ] ) @@ -87,7 +87,7 @@ def get_run_tags( for key, values in instance.get_run_tags( tag_keys=tag_keys, value_prefix=value_prefix, limit=limit ) - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 42265b2c2afaf..3a960ca89a6d0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -33,6 +33,7 @@ ) from dagster._core.snap.node import GraphDefSnap, OpDefSnap from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader +from dagster._core.storage.tags import TagType, get_definition_tag_type from dagster._core.utils import is_valid_email from dagster._core.workspace.permissions import Permissions from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -1209,6 +1210,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo) -> Sequence[GrapheneDefiniti return [ GrapheneDefinitionTag(key, value) for key, value in (self._external_asset_node.tags or {}).items() + if get_definition_tag_type(key) == TagType.USER_PROVIDED ] def resolve_op( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 18a6b13245a49..4f524bffab8ce 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -24,7 +24,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, TagType, - get_tag_type, + get_run_tag_type, ) from dagster._core.workspace.permissions import Permissions @@ -508,7 +508,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._backfill_job.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py index 30aeabd36c461..0ae8214053078 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py @@ -22,7 +22,7 @@ SensorInstigatorData, ) from dagster._core.storage.dagster_run import DagsterRun, RunsFilter -from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type +from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type from dagster._core.workspace.permissions import Permissions from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info from dagster._utils.yaml_utils import dump_run_config_yaml @@ -499,7 +499,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._run_request.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_runConfigYaml(self, _graphene_info: ResolveInfo): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index 6293be26a19ac..0ae0f436674ac 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -13,7 +13,7 @@ RunRecord, RunsFilter, ) -from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type +from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type from dagster._core.workspace.permissions import Permissions from dagster._utils.yaml_utils import dump_run_config_yaml @@ -502,7 +502,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self.dagster_run.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_rootRunId(self, _graphene_info: ResolveInfo): @@ -861,7 +861,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._active_preset_data.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index bf20826138e60..ebb6efc04110f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -1753,6 +1753,10 @@ def fresh_diamond_right(fresh_diamond_top): @asset( freshness_policy=FreshnessPolicy(maximum_lag_minutes=30), auto_materialize_policy=AutoMaterializePolicy.lazy(), + tags={ + "dagster/foo": "asdf", + ".dagster/bar": "qwer", + }, ) def fresh_diamond_bottom(fresh_diamond_left, fresh_diamond_right): return fresh_diamond_left + fresh_diamond_right diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 71ee0f9748137..9af8126659085 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -718,6 +718,22 @@ } """ +GET_TAGS = """ + query AssetNodeQuery($assetKey: AssetKeyInput!) { + assetNodeOrError(assetKey: $assetKey) { + ...on AssetNode { + assetKey { + path + } + tags { + key + value + } + } + } + } +""" + GET_TARGETING_INSTIGATORS = """ query AssetNodeQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { @@ -2560,6 +2576,18 @@ def test_auto_materialize_policy(self, graphql_context: WorkspaceRequestContext) assert len(fresh_diamond_bottom) == 1 assert fresh_diamond_bottom[0]["autoMaterializePolicy"]["policyType"] == "LAZY" + def test_tags(self, graphql_context: WorkspaceRequestContext): + result = execute_dagster_graphql( + graphql_context, + GET_TAGS, + variables={ + "assetKey": {"path": ["fresh_diamond_bottom"]}, + }, + ) + + fresh_diamond_bottom = result.data["assetNodeOrError"] + assert fresh_diamond_bottom["tags"] == [{"key": "dagster/foo", "value": "asdf"}] + def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext): result = execute_dagster_graphql(graphql_context, HAS_ASSET_CHECKS) diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index 727aa0b029a78..dce2b3b2c4710 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -101,7 +101,8 @@ class TagType(Enum): HIDDEN = "HIDDEN" -def get_tag_type(tag): +def get_run_tag_type(tag: str) -> TagType: + """Get the tag type for a run tag.""" if tag.startswith(SYSTEM_TAG_PREFIX): return TagType.SYSTEM elif tag.startswith(HIDDEN_TAG_PREFIX): @@ -110,6 +111,17 @@ def get_tag_type(tag): return TagType.USER_PROVIDED +def get_definition_tag_type(tag: str) -> TagType: + """Get the tag type for a definition tag, such as an asset tag.""" + # Though some definition tags may use the dagster/ prefix, in many cases they are + # user-provided and should be treated as such. The namespace here is used for standard + # tags rather than a system indicator. + if tag.startswith(HIDDEN_TAG_PREFIX): + return TagType.HIDDEN + else: + return TagType.USER_PROVIDED + + def check_reserved_tags(tags): check.opt_dict_param(tags, "tags", key_type=str, value_type=str)