From f1492cbdede7c371e075178231b182dea301535d Mon Sep 17 00:00:00 2001 From: benpankow Date: Tue, 20 Aug 2024 13:45:05 -0700 Subject: [PATCH] Introduce notion of hidden definitions tags --- .../dbt_example/airflow_dags/customers.csv | 101 ++++++++++++++++++ .../implementation/fetch_partition_sets.py | 4 +- .../implementation/fetch_runs.py | 6 +- .../dagster_graphql/schema/asset_graph.py | 2 + .../dagster_graphql/schema/backfill.py | 4 +- .../dagster_graphql/schema/instigation.py | 4 +- .../schema/pipelines/pipeline.py | 6 +- .../dagster_graphql_tests/graphql/repo.py | 4 + .../graphql/test_assets.py | 28 +++++ .../dagster/dagster/_core/storage/tags.py | 14 ++- 10 files changed, 160 insertions(+), 13 deletions(-) create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/customers.csv diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/customers.csv b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/customers.csv new file mode 100644 index 0000000000000..d04d951e7e466 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/customers.csv @@ -0,0 +1,101 @@ +customer_id,first_name,last_name,first_order,most_recent_order,number_of_orders,customer_lifetime_value +1,Michael,P.,2018-01-01,2018-02-10,2.0,33.0 +2,Shawn,M.,2018-01-11,2018-01-11,1.0,23.0 +3,Kathleen,P.,2018-01-02,2018-03-11,3.0,65.0 +6,Sarah,R.,2018-02-19,2018-02-19,1.0,8.0 +7,Martin,M.,2018-01-14,2018-01-14,1.0,26.0 +8,Frank,R.,2018-01-29,2018-03-12,2.0,45.0 +9,Jennifer,F.,2018-03-17,2018-03-17,1.0,30.0 +11,Fred,S.,2018-03-23,2018-03-23,1.0,3.0 +12,Amy,D.,2018-03-03,2018-03-03,1.0,4.0 +13,Kathleen,M.,2018-03-07,2018-03-07,1.0,26.0 +16,Amanda,H.,2018-02-02,2018-02-02,1.0,12.0 +18,Johnny,K.,2018-02-27,2018-02-27,1.0,29.0 +19,Virginia,F.,2018-03-16,2018-03-16,1.0,3.0 +20,Anna,A.,2018-01-23,2018-01-23,1.0,15.0 +21,Willie,H.,2018-03-28,2018-03-28,1.0,22.0 +22,Sean,H.,2018-01-26,2018-03-01,3.0,52.0 +25,Victor,H.,2018-01-17,2018-03-20,2.0,24.0 +26,Aaron,R.,2018-02-11,2018-03-08,2.0,8.0 +27,Benjamin,B.,2018-02-21,2018-04-04,2.0,27.0 +28,Lisa,W.,2018-02-04,2018-02-04,1.0,3.0 +30,Christina,W.,2018-03-02,2018-03-14,2.0,57.0 +31,Jane,G.,2018-02-17,2018-02-17,1.0,18.0 +32,Thomas,O.,2018-01-28,2018-01-28,1.0,30.0 +33,Katherine,M.,2018-02-13,2018-02-13,1.0,14.0 +34,Jennifer,S.,2018-02-26,2018-02-26,1.0,3.0 +35,Sara,T.,2018-02-21,2018-03-21,2.0,34.0 +36,Harold,O.,2018-03-10,2018-03-10,1.0,28.0 +38,Dennis,J.,2018-02-06,2018-02-06,1.0,15.0 +39,Louise,W.,2018-01-18,2018-01-18,1.0,10.0 +40,Maria,A.,2018-01-17,2018-01-17,1.0,3.0 +41,Gloria,C.,2018-04-07,2018-04-07,1.0,10.0 +42,Diana,S.,2018-02-04,2018-03-12,2.0,27.0 +46,Norma,C.,2018-03-24,2018-03-27,2.0,64.0 +47,Marie,P.,2018-03-26,2018-03-31,2.0,36.0 +51,Howard,R.,2018-01-28,2018-02-23,3.0,99.0 +52,Laura,F.,2018-03-23,2018-03-23,1.0,27.0 +53,Anne,B.,2018-01-12,2018-03-11,2.0,39.0 +54,Rose,M.,2018-01-07,2018-03-24,5.0,57.0 +57,Paul,W.,2018-01-31,2018-03-05,2.0,26.0 +59,Adam,A.,2018-01-15,2018-01-15,1.0,1.0 +63,Edward,G.,2018-03-03,2018-04-03,2.0,23.0 +64,David,C.,2018-01-05,2018-01-20,2.0,30.0 +66,Adam,W.,2018-02-17,2018-04-03,3.0,39.0 +68,Jesse,E.,2018-03-26,2018-03-26,1.0,23.0 +69,Janet,P.,2018-02-02,2018-03-18,2.0,32.0 +70,Helen,F.,2018-03-06,2018-03-26,2.0,54.0 +71,Gerald,C.,2018-01-18,2018-02-24,3.0,44.0 +76,Barbara,W.,2018-03-23,2018-03-23,1.0,2.0 +79,Jack,R.,2018-02-28,2018-03-11,2.0,27.0 +80,Phillip,H.,2018-02-08,2018-02-08,1.0,29.0 +84,Christina,R.,2018-01-17,2018-04-02,2.0,36.0 +85,Theresa,M.,2018-02-10,2018-04-09,2.0,33.0 +86,Jason,C.,2018-01-24,2018-01-24,1.0,8.0 +88,Adam,T.,2018-01-09,2018-01-09,1.0,16.0 +89,Margaret,J.,2018-04-07,2018-04-07,1.0,14.0 +90,Paul,P.,2018-03-23,2018-04-06,2.0,43.0 +91,Todd,W.,2018-03-27,2018-03-27,1.0,29.0 +92,Willie,O.,2018-02-16,2018-02-16,1.0,17.0 +93,Frances,R.,2018-03-01,2018-03-01,1.0,2.0 +94,Gregory,H.,2018-01-04,2018-01-29,2.0,24.0 +99,Mary,G.,2018-01-14,2018-02-14,2.0,44.0 +50,Billy,L.,2018-01-05,2018-02-20,2.0,47.0 +4,Jimmy,C.,,,, +5,Katherine,R.,,,, +10,Henry,W.,,,, +14,Steve,F.,,,, +15,Teresa,H.,,,, +17,Kimberly,R.,,,, +23,Mildred,A.,,,, +24,David,G.,,,, +29,Benjamin,K.,,,, +37,Shirley,J.,,,, +43,Kelly,N.,,,, +44,Jane,R.,,,, +45,Scott,B.,,,, +48,Lillian,C.,,,, +49,Judy,N.,,,, +55,Nicholas,R.,,,, +56,Joshua,K.,,,, +58,Kathryn,K.,,,, +60,Norma,W.,,,, +61,Timothy,R.,,,, +62,Elizabeth,P.,,,, +65,Brenda,W.,,,, +67,Michael,H.,,,, +72,Kathryn,O.,,,, +73,Alan,B.,,,, +74,Harry,A.,,,, +75,Andrea,H.,,,, +77,Anne,W.,,,, +78,Harry,H.,,,, +81,Shirley,H.,,,, +82,Arthur,D.,,,, +83,Virginia,R.,,,, +87,Phillip,B.,,,, +95,Lisa,P.,,,, +96,Jacqueline,A.,,,, +97,Shirley,D.,,,, +98,Nicole,M.,,,, +100,Jean,M.,,,, diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py index 8f60467179dee..dfe5cdfaad554 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py @@ -12,7 +12,7 @@ PARTITION_SET_TAG, REPOSITORY_LABEL_TAG, TagType, - get_tag_type, + get_run_tag_type, ) from dagster._utils.yaml_utils import dump_run_config_yaml @@ -153,7 +153,7 @@ def get_partition_tags( results=[ GraphenePipelineTag(key=key, value=value) for key, value in result.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] ) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 3e553dc5c617d..184a3126f1aca 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -22,7 +22,7 @@ from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord -from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type +from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_run_tag_type from dagster._record import record from dagster._time import datetime_from_timestamp, get_current_timestamp @@ -66,7 +66,7 @@ def get_run_tag_keys(graphene_info: "ResolveInfo") -> "GrapheneRunTagKeys": keys=[ tag_key for tag_key in graphene_info.context.instance.get_run_tag_keys() - if get_tag_type(tag_key) != TagType.HIDDEN + if get_run_tag_type(tag_key) != TagType.HIDDEN ] ) @@ -87,7 +87,7 @@ def get_run_tags( for key, values in instance.get_run_tags( tag_keys=tag_keys, value_prefix=value_prefix, limit=limit ) - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 42265b2c2afaf..3a960ca89a6d0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -33,6 +33,7 @@ ) from dagster._core.snap.node import GraphDefSnap, OpDefSnap from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader +from dagster._core.storage.tags import TagType, get_definition_tag_type from dagster._core.utils import is_valid_email from dagster._core.workspace.permissions import Permissions from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -1209,6 +1210,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo) -> Sequence[GrapheneDefiniti return [ GrapheneDefinitionTag(key, value) for key, value in (self._external_asset_node.tags or {}).items() + if get_definition_tag_type(key) == TagType.USER_PROVIDED ] def resolve_op( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 18a6b13245a49..4f524bffab8ce 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -24,7 +24,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, TagType, - get_tag_type, + get_run_tag_type, ) from dagster._core.workspace.permissions import Permissions @@ -508,7 +508,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._backfill_job.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py index 30aeabd36c461..0ae8214053078 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py @@ -22,7 +22,7 @@ SensorInstigatorData, ) from dagster._core.storage.dagster_run import DagsterRun, RunsFilter -from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type +from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type from dagster._core.workspace.permissions import Permissions from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info from dagster._utils.yaml_utils import dump_run_config_yaml @@ -499,7 +499,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._run_request.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_runConfigYaml(self, _graphene_info: ResolveInfo): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index 6293be26a19ac..0ae0f436674ac 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -13,7 +13,7 @@ RunRecord, RunsFilter, ) -from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type +from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_run_tag_type from dagster._core.workspace.permissions import Permissions from dagster._utils.yaml_utils import dump_run_config_yaml @@ -502,7 +502,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self.dagster_run.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] def resolve_rootRunId(self, _graphene_info: ResolveInfo): @@ -861,7 +861,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self._active_preset_data.tags.items() - if get_tag_type(key) != TagType.HIDDEN + if get_run_tag_type(key) != TagType.HIDDEN ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index bf20826138e60..ebb6efc04110f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -1753,6 +1753,10 @@ def fresh_diamond_right(fresh_diamond_top): @asset( freshness_policy=FreshnessPolicy(maximum_lag_minutes=30), auto_materialize_policy=AutoMaterializePolicy.lazy(), + tags={ + "dagster/foo": "asdf", + ".dagster/bar": "qwer", + }, ) def fresh_diamond_bottom(fresh_diamond_left, fresh_diamond_right): return fresh_diamond_left + fresh_diamond_right diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 71ee0f9748137..9af8126659085 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -718,6 +718,22 @@ } """ +GET_TAGS = """ + query AssetNodeQuery($assetKey: AssetKeyInput!) { + assetNodeOrError(assetKey: $assetKey) { + ...on AssetNode { + assetKey { + path + } + tags { + key + value + } + } + } + } +""" + GET_TARGETING_INSTIGATORS = """ query AssetNodeQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { @@ -2560,6 +2576,18 @@ def test_auto_materialize_policy(self, graphql_context: WorkspaceRequestContext) assert len(fresh_diamond_bottom) == 1 assert fresh_diamond_bottom[0]["autoMaterializePolicy"]["policyType"] == "LAZY" + def test_tags(self, graphql_context: WorkspaceRequestContext): + result = execute_dagster_graphql( + graphql_context, + GET_TAGS, + variables={ + "assetKey": {"path": ["fresh_diamond_bottom"]}, + }, + ) + + fresh_diamond_bottom = result.data["assetNodeOrError"] + assert fresh_diamond_bottom["tags"] == [{"key": "dagster/foo", "value": "asdf"}] + def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext): result = execute_dagster_graphql(graphql_context, HAS_ASSET_CHECKS) diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index 727aa0b029a78..dce2b3b2c4710 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -101,7 +101,8 @@ class TagType(Enum): HIDDEN = "HIDDEN" -def get_tag_type(tag): +def get_run_tag_type(tag: str) -> TagType: + """Get the tag type for a run tag.""" if tag.startswith(SYSTEM_TAG_PREFIX): return TagType.SYSTEM elif tag.startswith(HIDDEN_TAG_PREFIX): @@ -110,6 +111,17 @@ def get_tag_type(tag): return TagType.USER_PROVIDED +def get_definition_tag_type(tag: str) -> TagType: + """Get the tag type for a definition tag, such as an asset tag.""" + # Though some definition tags may use the dagster/ prefix, in many cases they are + # user-provided and should be treated as such. The namespace here is used for standard + # tags rather than a system indicator. + if tag.startswith(HIDDEN_TAG_PREFIX): + return TagType.HIDDEN + else: + return TagType.USER_PROVIDED + + def check_reserved_tags(tags): check.opt_dict_param(tags, "tags", key_type=str, value_type=str)