diff --git a/python_modules/dagster-graphql/dagster_graphql/client/client.py b/python_modules/dagster-graphql/dagster_graphql/client/client.py index 588bc25e8ffb8..0714fd21c8106 100644 --- a/python_modules/dagster-graphql/dagster_graphql/client/client.py +++ b/python_modules/dagster-graphql/dagster_graphql/client/client.py @@ -6,7 +6,7 @@ from dagster import DagsterRunStatus from dagster._annotations import deprecated, public from dagster._core.definitions.run_config import RunConfig, convert_config_input -from dagster._core.definitions.utils import normalize_tags +from dagster._utils.tags import normalize_tags from gql import Client, gql from gql.transport import Transport from gql.transport.exceptions import TransportServerError diff --git a/python_modules/dagster/dagster/_cli/job.py b/python_modules/dagster/dagster/_cli/job.py index 3493794592292..8ddce7bdbad9f 100644 --- a/python_modules/dagster/dagster/_cli/job.py +++ b/python_modules/dagster/dagster/_cli/job.py @@ -30,7 +30,6 @@ from dagster._core.definitions import JobDefinition from dagster._core.definitions.reconstruct import ReconstructableJob from dagster._core.definitions.selector import JobSubsetSelector -from dagster._core.definitions.utils import normalize_tags from dagster._core.errors import DagsterBackfillFailedError from dagster._core.execution.api import execute_job from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill @@ -60,6 +59,7 @@ from dagster._utils.indenting_printer import IndentingPrinter from dagster._utils.interrupts import capture_interrupts from dagster._utils.merger import merge_dicts +from dagster._utils.tags import normalize_tags from dagster._utils.yaml_utils import dump_run_config_yaml, load_yaml_from_glob_list T = TypeVar("T") diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py index d3dd6bf43a32e..7fbc1f318bde2 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_out.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py @@ -17,12 +17,9 @@ from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.input import NoValueSentinel from dagster._core.definitions.output import Out -from dagster._core.definitions.utils import ( - DEFAULT_IO_MANAGER_KEY, - resolve_automation_condition, - validate_tags_strict, -) +from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY, resolve_automation_condition from dagster._core.types.dagster_type import DagsterType, resolve_dagster_type +from dagster._utils.tags import validate_tags_strict from dagster._utils.warnings import disable_dagster_warnings diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index f7efbe95da85d..a0a9ccff1746f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -16,12 +16,12 @@ resolve_automation_condition, validate_asset_owner, validate_group_name, - validate_tags_strict, ) from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.storage.tags import KIND_PREFIX from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.internal_init import IHasInternalInit +from dagster._utils.tags import validate_tags_strict if TYPE_CHECKING: from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 0b06741543c93..ec3530673254d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -71,12 +71,12 @@ DEFAULT_IO_MANAGER_KEY, normalize_group_name, validate_asset_owner, - validate_tags_strict, ) from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._utils import IHasInternalInit from dagster._utils.merger import merge_dicts from dagster._utils.security import non_secure_md5_hash_str +from dagster._utils.tags import validate_tags_strict from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings if TYPE_CHECKING: diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index 9d556dd69eacf..68102bbcd5d6b 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -14,7 +14,8 @@ SensorEvaluationContext, SensorType, ) -from dagster._core.definitions.utils import check_valid_name, normalize_tags +from dagster._core.definitions.utils import check_valid_name +from dagster._utils.tags import normalize_tags def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext): diff --git a/python_modules/dagster/dagster/_core/definitions/composition.py b/python_modules/dagster/dagster/_core/definitions/composition.py index 6e6dfd1a5709d..0a1d84c80307a 100644 --- a/python_modules/dagster/dagster/_core/definitions/composition.py +++ b/python_modules/dagster/dagster/_core/definitions/composition.py @@ -44,13 +44,14 @@ from dagster._core.definitions.output import OutputDefinition, OutputMapping from dagster._core.definitions.policy import RetryPolicy from dagster._core.definitions.resource_definition import ResourceDefinition -from dagster._core.definitions.utils import check_valid_name, normalize_tags +from dagster._core.definitions.utils import check_valid_name from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, DagsterInvariantViolationError, ) from dagster._utils import is_named_tuple_instance +from dagster._utils.tags import normalize_tags from dagster._utils.warnings import disable_dagster_warnings if TYPE_CHECKING: diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 218f43659f39e..b8d059449731f 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -55,11 +55,11 @@ DEFAULT_OUTPUT, NoValueSentinel, resolve_automation_condition, - validate_tags_strict, ) from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._core.storage.tags import KIND_PREFIX from dagster._core.types.dagster_type import DagsterType +from dagster._utils.tags import validate_tags_strict from dagster._utils.warnings import disable_dagster_warnings diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/job_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/job_decorator.py index 7b7768d055bc8..5a76b6e7185ac 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/job_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/job_decorator.py @@ -11,7 +11,7 @@ from dagster._core.definitions.metadata import RawMetadataValue from dagster._core.definitions.policy import RetryPolicy from dagster._core.definitions.resource_definition import ResourceDefinition -from dagster._core.definitions.utils import normalize_tags +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions.executor_definition import ExecutorDefinition diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/schedule_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/schedule_decorator.py index 15884ea33b76f..c8b34ef33e584 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/schedule_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/schedule_decorator.py @@ -19,13 +19,13 @@ ) from dagster._core.definitions.sensor_definition import get_context_param_name from dagster._core.definitions.target import ExecutableDefinition -from dagster._core.definitions.utils import normalize_tags from dagster._core.errors import ( DagsterInvalidDefinitionError, ScheduleExecutionError, user_code_error_boundary, ) from dagster._utils import ensure_gen +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions.asset_selection import CoercibleToAssetSelection diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py index bb1cedebbb5bb..60394ff658263 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py @@ -20,7 +20,7 @@ from dagster._core.definitions.resource_annotation import get_resource_args from dagster._core.definitions.resource_definition import ResourceDefinition from dagster._core.definitions.source_asset import SourceAsset, SourceAssetObserveFunction -from dagster._core.definitions.utils import validate_tags_strict +from dagster._utils.tags import validate_tags_strict from dagster._utils.warnings import disable_dagster_warnings diff --git a/python_modules/dagster/dagster/_core/definitions/dependency.py b/python_modules/dagster/dagster/_core/definitions/dependency.py index 2ae2858bd62dd..aec697cef6441 100644 --- a/python_modules/dagster/dagster/_core/definitions/dependency.py +++ b/python_modules/dagster/dagster/_core/definitions/dependency.py @@ -35,11 +35,12 @@ ) from dagster._core.definitions.output import OutputDefinition from dagster._core.definitions.policy import RetryPolicy -from dagster._core.definitions.utils import DEFAULT_OUTPUT, normalize_tags, struct_to_string +from dagster._core.definitions.utils import DEFAULT_OUTPUT, struct_to_string from dagster._core.errors import DagsterInvalidDefinitionError from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils import hash_collection +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions.asset_layer import AssetLayer diff --git a/python_modules/dagster/dagster/_core/definitions/events.py b/python_modules/dagster/dagster/_core/definitions/events.py index e7544384f5c72..131cedf096fc9 100644 --- a/python_modules/dagster/dagster/_core/definitions/events.py +++ b/python_modules/dagster/dagster/_core/definitions/events.py @@ -750,7 +750,7 @@ def __new__(cls, hook_name: str, is_skipped: Optional[bool] = None): def validate_asset_event_tags(tags: Optional[Mapping[str, str]]) -> Optional[Mapping[str, str]]: - from dagster._core.definitions.utils import validate_tag_strict + from dagster._utils.tags import validate_tag_strict if tags is None: return None diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index df5fc2768473e..e1efda8156c61 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -63,7 +63,7 @@ ensure_requirements_satisfied, ) from dagster._core.definitions.run_request import RunRequest -from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY, check_valid_name, normalize_tags +from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY, check_valid_name from dagster._core.errors import ( DagsterInvalidConfigError, DagsterInvalidDefinitionError, @@ -82,6 +82,7 @@ from dagster._utils import IHasInternalInit from dagster._utils.cached_method import cached_method from dagster._utils.merger import merge_dicts +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._config.snap import ConfigSchemaSnapshot diff --git a/python_modules/dagster/dagster/_core/definitions/node_definition.py b/python_modules/dagster/dagster/_core/definitions/node_definition.py index 9441dfe2ff81e..65fb36c5a5688 100644 --- a/python_modules/dagster/dagster/_core/definitions/node_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/node_definition.py @@ -14,8 +14,9 @@ from dagster._core.definitions.configurable import NamedConfigurableDefinition from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.policy import RetryPolicy -from dagster._core.definitions.utils import check_valid_name, normalize_tags +from dagster._core.definitions.utils import check_valid_name from dagster._core.errors import DagsterInvariantViolationError +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions.asset_layer import AssetLayer diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index ca89e9950374c..504d3c8e6543b 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -32,7 +32,6 @@ ) from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.run_config import RunConfig, convert_config_input -from dagster._core.definitions.utils import normalize_tags from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidDeserializationVersionError, @@ -44,6 +43,7 @@ from dagster._serdes import whitelist_for_serdes from dagster._utils import xor from dagster._utils.cached_method import cached_method +from dagster._utils.tags import normalize_tags from dagster._utils.warnings import normalize_renamed_param DEFAULT_DATE_FORMAT = "%Y-%m-%d" diff --git a/python_modules/dagster/dagster/_core/definitions/run_request.py b/python_modules/dagster/dagster/_core/definitions/run_request.py index 50cf3e623d36b..dd99e422bc333 100644 --- a/python_modules/dagster/dagster/_core/definitions/run_request.py +++ b/python_modules/dagster/dagster/_core/definitions/run_request.py @@ -30,7 +30,6 @@ ) from dagster._core.definitions.events import AssetKey, AssetMaterialization, AssetObservation from dagster._core.definitions.partition_key_range import PartitionKeyRange -from dagster._core.definitions.utils import normalize_tags from dagster._core.instance import DynamicPartitionsStore from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus from dagster._core.storage.tags import ( @@ -42,6 +41,7 @@ from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method from dagster._utils.error import SerializableErrorInfo +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions.job_definition import JobDefinition diff --git a/python_modules/dagster/dagster/_core/definitions/schedule_definition.py b/python_modules/dagster/dagster/_core/definitions/schedule_definition.py index 40f9da97a9ca3..d23777537091f 100644 --- a/python_modules/dagster/dagster/_core/definitions/schedule_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/schedule_definition.py @@ -40,7 +40,7 @@ ExecutableDefinition, ) from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition -from dagster._core.definitions.utils import check_valid_name, normalize_tags +from dagster._core.definitions.utils import check_valid_name from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, @@ -56,6 +56,7 @@ from dagster._utils import IHasInternalInit, ensure_gen from dagster._utils.merger import merge_dicts from dagster._utils.schedules import has_out_of_range_cron_interval, is_valid_cron_schedule +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster import ResourceDefinition diff --git a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py index 06a9caf8eed55..dd7c12a279cd4 100644 --- a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py @@ -62,7 +62,7 @@ AutomationTarget, ExecutableDefinition, ) -from dagster._core.definitions.utils import check_valid_name, normalize_tags +from dagster._core.definitions.utils import check_valid_name from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, @@ -76,6 +76,7 @@ from dagster._time import get_current_datetime from dagster._utils import IHasInternalInit, normalize_to_repository from dagster._utils.merger import merge_dicts +from dagster._utils.tags import normalize_tags from dagster._utils.warnings import deprecation_warning, normalize_renamed_param if TYPE_CHECKING: diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 807d8ee3a8b05..a51784d59889f 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -45,7 +45,6 @@ DEFAULT_GROUP_NAME, DEFAULT_IO_MANAGER_KEY, normalize_group_name, - validate_tags_strict, ) from dagster._core.errors import ( DagsterInvalidDefinitionError, @@ -54,6 +53,7 @@ DagsterInvariantViolationError, ) from dagster._utils.internal_init import IHasInternalInit +from dagster._utils.tags import validate_tags_strict if TYPE_CHECKING: from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index f97f48528b2cb..f3d03cacfee1c 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -17,9 +17,9 @@ from dagster._core.definitions.policy import RetryPolicy from dagster._core.definitions.resource_definition import ResourceDefinition from dagster._core.definitions.run_request import RunRequest -from dagster._core.definitions.utils import normalize_tags from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.instance import DynamicPartitionsStore +from dagster._utils.tags import normalize_tags if TYPE_CHECKING: from dagster._core.definitions import JobDefinition diff --git a/python_modules/dagster/dagster/_core/definitions/utils.py b/python_modules/dagster/dagster/_core/definitions/utils.py index df58ddb8ebb4f..7a177be59b511 100644 --- a/python_modules/dagster/dagster/_core/definitions/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/utils.py @@ -1,7 +1,6 @@ import keyword import os import re -import warnings from glob import glob from typing import ( TYPE_CHECKING, @@ -20,9 +19,7 @@ import yaml import dagster._check as check -import dagster._seven as seven from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError -from dagster._core.storage.tags import check_reserved_tags from dagster._core.utils import is_valid_email from dagster._utils.warnings import deprecation_warning from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls @@ -158,116 +155,6 @@ def struct_to_string(name: str, **kwargs: object) -> str: return f"{name}({props_str})" -def normalize_tags( - tags: Optional[Mapping[str, Any]], - allow_reserved_tags: bool = True, - warning_stacklevel: int = 4, -) -> Mapping[str, str]: - """Normalizes JSON-object tags into string tags and warns on deprecated tags. - - New tags properties should _not_ use this function, because it doesn't hard error on tags that - are no longer supported. - """ - valid_tags: Dict[str, str] = {} - invalid_tag_keys = [] - for key, value in check.opt_mapping_param(tags, "tags", key_type=str).items(): - if not isinstance(value, str): - valid = False - err_reason = f'Could not JSON encode value "{value}"' - str_val = None - try: - str_val = seven.json.dumps(value) - err_reason = f'JSON encoding "{str_val}" of value "{value}" is not equivalent to original value' - - valid = seven.json.loads(str_val) == value - except Exception: - pass - - if not valid: - raise DagsterInvalidDefinitionError( - f'Invalid value for tag "{key}", {err_reason}. Tag values must be strings ' - "or meet the constraint that json.loads(json.dumps(value)) == value." - ) - - valid_tags[key] = str_val # type: ignore # (possible none) - else: - valid_tags[key] = value - - if not is_valid_definition_tag_key(key): - invalid_tag_keys.append(key) - - if invalid_tag_keys: - invalid_tag_keys_sample = invalid_tag_keys[: min(5, len(invalid_tag_keys))] - warnings.warn( - f"Non-compliant tag keys like {invalid_tag_keys_sample} are deprecated. {VALID_DEFINITION_TAG_KEY_EXPLANATION}", - category=DeprecationWarning, - stacklevel=warning_stacklevel, - ) - - if not allow_reserved_tags: - check_reserved_tags(valid_tags) - - return valid_tags - - -# Inspired by allowed Kubernetes labels: -# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set - -# We allow in some cases for users to specify multi-level namespaces for tags, -# right now we only allow this for the `dagster/kind` namespace, which is how asset kinds are -# encoded under the hood. -VALID_NESTED_NAMESPACES_TAG_KEYS = r"dagster/kind/" -VALID_DEFINITION_TAG_KEY_REGEX_STR = ( - r"^([A-Za-z0-9_.-]{1,63}/|" + VALID_NESTED_NAMESPACES_TAG_KEYS + r")?[A-Za-z0-9_.-]{1,63}$" -) -VALID_DEFINITION_TAG_KEY_REGEX = re.compile(VALID_DEFINITION_TAG_KEY_REGEX_STR) -VALID_DEFINITION_TAG_KEY_EXPLANATION = ( - "Allowed characters: alpha-numeric, '_', '-', '.'. " - "Tag keys can also contain a namespace section, separated by a '/'. Each section " - "must have <= 63 characters." -) - -VALID_DEFINITION_TAG_VALUE_REGEX_STR = r"^[A-Za-z0-9_.-]{0,63}$" -VALID_DEFINITION_TAG_VALUE_REGEX = re.compile(VALID_DEFINITION_TAG_VALUE_REGEX_STR) - - -def is_valid_definition_tag_key(key: str) -> bool: - return bool(VALID_DEFINITION_TAG_KEY_REGEX.match(key)) - - -def is_valid_definition_tag_value(key: str) -> bool: - return bool(VALID_DEFINITION_TAG_VALUE_REGEX.match(key)) - - -def validate_tags_strict(tags: Optional[Mapping[str, str]]) -> Optional[Mapping[str, str]]: - if tags is None: - return tags - - for key, value in tags.items(): - validate_tag_strict(key, value) - - return tags - - -def validate_tag_strict(key: str, value: str) -> None: - if not isinstance(key, str): - raise DagsterInvalidDefinitionError("Tag keys must be strings") - - if not isinstance(value, str): - raise DagsterInvalidDefinitionError("Tag values must be strings") - - if not is_valid_definition_tag_key(key): - raise DagsterInvalidDefinitionError( - f"Invalid tag key: {key}. {VALID_DEFINITION_TAG_KEY_EXPLANATION}" - ) - - if not is_valid_definition_tag_value(value): - raise DagsterInvalidDefinitionError( - f"Invalid tag value: {value}, for key: {key}. Allowed characters: alpha-numeric, '_', '-', '.'. " - "Must have <= 63 characters." - ) - - def validate_asset_owner(owner: str, key: "AssetKey") -> None: if not is_valid_email(owner) and not (owner.startswith("team:") and len(owner) > 5): raise DagsterInvalidDefinitionError( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index b1a9e098cf25f..d4aed91de8e4a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1480,9 +1480,9 @@ def create_run( asset_graph: Optional["BaseAssetGraph"], ) -> DagsterRun: from dagster._core.definitions.asset_check_spec import AssetCheckKey - from dagster._core.definitions.utils import normalize_tags from dagster._core.remote_representation.origin import RemoteJobOrigin from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot + from dagster._utils.tags import normalize_tags check.str_param(job_name, "job_name") check.opt_str_param( diff --git a/python_modules/dagster/dagster/_utils/tags.py b/python_modules/dagster/dagster/_utils/tags.py index 040fd8dd00ae5..701f65dfbc94d 100644 --- a/python_modules/dagster/dagster/_utils/tags.py +++ b/python_modules/dagster/dagster/_utils/tags.py @@ -1,7 +1,12 @@ +import re +import warnings from collections import defaultdict from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional, Sequence, Tuple, Union +import dagster._seven as seven from dagster import _check as check +from dagster._core.errors import DagsterInvalidDefinitionError +from dagster._core.storage.tags import check_reserved_tags if TYPE_CHECKING: from dagster._core.execution.plan.step import ExecutionStep @@ -92,3 +97,130 @@ def get_boolean_tag_value(tag_value: Optional[str], default_value: bool = False) return default_value return tag_value.lower() not in {"false", "none", "0", ""} + + +# ######################## +# ##### NORMALIZATION +# ######################## + +# There are two variants of tag normalization: +# +# 1. Legacy. Uses `normalize_tags`. Accepts a wide range of string keys and JSON-serializable values. +# 2. Strict. Uses `validate_tags_strict`. Accepts a restricted character set for string keys and +# only accepts strings values. +# +# Legacy "tags" normalization supports an older vision of tags where potentially large values could +# be stored in tags to configure runs. It also supports (but issues a warning for) non-standard +# characters in tag keys like "&". We want to move away from this, but we still need to support it +# for backcompat. +# +# Strict "tags" normalization supports the new vision of tags where they are short string labels +# used for filtering and grouping in the UI. New tags arguments should generally use this +# normalization. + + +def normalize_tags( + tags: Optional[Mapping[str, Any]], + allow_reserved_tags: bool = True, + warning_stacklevel: int = 4, +) -> Mapping[str, str]: + """Normalizes JSON-object tags into string tags and warns on deprecated tags. + + New tags properties should _not_ use this function, because it doesn't hard error on tags that + are no longer supported. + """ + valid_tags: Dict[str, str] = {} + invalid_tag_keys = [] + for key, value in check.opt_mapping_param(tags, "tags", key_type=str).items(): + if not isinstance(value, str): + valid = False + err_reason = f'Could not JSON encode value "{value}"' + str_val = None + try: + str_val = seven.json.dumps(value) + err_reason = f'JSON encoding "{str_val}" of value "{value}" is not equivalent to original value' + + valid = seven.json.loads(str_val) == value + except Exception: + pass + + if not valid: + raise DagsterInvalidDefinitionError( + f'Invalid value for tag "{key}", {err_reason}. Tag values must be strings ' + "or meet the constraint that json.loads(json.dumps(value)) == value." + ) + + valid_tags[key] = str_val # type: ignore # (possible none) + else: + valid_tags[key] = value + + if not is_valid_definition_tag_key(key): + invalid_tag_keys.append(key) + + if invalid_tag_keys: + invalid_tag_keys_sample = invalid_tag_keys[: min(5, len(invalid_tag_keys))] + warnings.warn( + f"Non-compliant tag keys like {invalid_tag_keys_sample} are deprecated. {VALID_DEFINITION_TAG_KEY_EXPLANATION}", + category=DeprecationWarning, + stacklevel=warning_stacklevel, + ) + + if not allow_reserved_tags: + check_reserved_tags(valid_tags) + + return valid_tags + + +def validate_tag_strict(key: str, value: str) -> None: + if not isinstance(key, str): + raise DagsterInvalidDefinitionError("Tag keys must be strings") + elif not isinstance(value, str): + raise DagsterInvalidDefinitionError("Tag values must be strings") + elif not is_valid_definition_tag_key(key): + raise DagsterInvalidDefinitionError( + f"Invalid tag key: {key}. {VALID_DEFINITION_TAG_KEY_EXPLANATION}" + ) + elif not is_valid_definition_tag_value(value): + raise DagsterInvalidDefinitionError( + f"Invalid tag value: {value}, for key: {key}. Allowed characters: alpha-numeric, '_', '-', '.'. " + "Must have <= 63 characters." + ) + + +def validate_tags_strict(tags: Optional[Mapping[str, str]]) -> Optional[Mapping[str, str]]: + if tags is None: + return tags + + for key, value in tags.items(): + validate_tag_strict(key, value) + + return tags + + +# Inspired by allowed Kubernetes labels: +# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set + +# We allow in some cases for users to specify multi-level namespaces for tags, +# right now we only allow this for the `dagster/kind` namespace, which is how asset kinds are +# encoded under the hood. +VALID_NESTED_NAMESPACES_TAG_KEYS = r"dagster/kind/" +VALID_DEFINITION_TAG_KEY_REGEX_STR = ( + r"^([A-Za-z0-9_.-]{1,63}/|" + VALID_NESTED_NAMESPACES_TAG_KEYS + r")?[A-Za-z0-9_.-]{1,63}$" +) +VALID_DEFINITION_TAG_KEY_REGEX = re.compile(VALID_DEFINITION_TAG_KEY_REGEX_STR) +VALID_DEFINITION_TAG_KEY_EXPLANATION = ( + "Allowed characters: alpha-numeric, '_', '-', '.'. " + "Tag keys can also contain a namespace section, separated by a '/'. Each section " + "must have <= 63 characters." +) + +VALID_DEFINITION_TAG_VALUE_REGEX_STR = r"^[A-Za-z0-9_.-]{0,63}$" +VALID_DEFINITION_TAG_VALUE_REGEX = re.compile(VALID_DEFINITION_TAG_VALUE_REGEX_STR) + + +def is_valid_definition_tag_key(key: str) -> bool: + return bool(VALID_DEFINITION_TAG_KEY_REGEX.match(key)) + + +def is_valid_definition_tag_value(key: str) -> bool: + return bool(VALID_DEFINITION_TAG_VALUE_REGEX.match(key)) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_job.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_job.py index 68169396e00ad..82274b64bf39f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_job.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_job.py @@ -2,9 +2,9 @@ import warnings from dagster import ConfigMapping, DagsterInstance, Field, JobDefinition, job, logger, op, resource -from dagster._core.definitions.utils import normalize_tags from dagster._core.storage.tags import MAX_RETRIES_TAG, RETRY_ON_ASSET_OR_OP_FAILURE_TAG from dagster._core.utils import coerce_valid_log_level +from dagster._utils.tags import normalize_tags def test_basic_job(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_utils.py b/python_modules/dagster/dagster_tests/definitions_tests/test_utils.py index 5c0f47227dbe0..e9acb58b03e1a 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_utils.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_utils.py @@ -1,12 +1,7 @@ import pytest -from dagster._core.definitions.utils import ( - MAX_TITLE_LENGTH, - check_valid_title, - is_valid_definition_tag_key, - is_valid_definition_tag_value, - is_valid_title, -) +from dagster._core.definitions.utils import MAX_TITLE_LENGTH, check_valid_title, is_valid_title from dagster._core.errors import DagsterInvariantViolationError +from dagster._utils.tags import is_valid_definition_tag_key, is_valid_definition_tag_value def test_is_valid_definition_tag_key_kinds() -> None: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/dagster_job_factory.py b/python_modules/libraries/dagster-airflow/dagster_airflow/dagster_job_factory.py index ea74bff34289b..eb5378635d71b 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow/dagster_job_factory.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/dagster_job_factory.py @@ -8,8 +8,8 @@ ResourceDefinition, _check as check, ) -from dagster._core.definitions.utils import normalize_tags from dagster._core.instance import IS_AIRFLOW_INGEST_PIPELINE_STR +from dagster._utils.tags import normalize_tags from dagster_airflow.airflow_dag_converter import get_graph_definition_args from dagster_airflow.resources import ( diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py index cc6f49c4b509d..592246635f2a5 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py @@ -10,7 +10,7 @@ _check as check, ) from dagster._annotations import experimental, public -from dagster._core.definitions.utils import is_valid_definition_tag_key +from dagster._utils.tags import is_valid_definition_tag_key from dagster_dbt.asset_utils import ( default_asset_key_fn, diff --git a/python_modules/libraries/dagstermill/dagstermill/asset_factory.py b/python_modules/libraries/dagstermill/dagstermill/asset_factory.py index a86882847fb61..186d58c77b8b5 100644 --- a/python_modules/libraries/dagstermill/dagstermill/asset_factory.py +++ b/python_modules/libraries/dagstermill/dagstermill/asset_factory.py @@ -19,9 +19,9 @@ from dagster._config.pythonic_config import Config, infer_schema_from_config_class from dagster._config.pythonic_config.type_check_utils import safe_is_subclass from dagster._core.definitions.events import CoercibleToAssetKey, CoercibleToAssetKeyPrefix -from dagster._core.definitions.utils import normalize_tags from dagster._core.execution.context.compute import OpExecutionContext from dagster._core.storage.tags import COMPUTE_KIND_TAG +from dagster._utils.tags import normalize_tags from dagstermill.factory import _clean_path_for_windows, execute_notebook diff --git a/python_modules/libraries/dagstermill/dagstermill/factory.py b/python_modules/libraries/dagstermill/dagstermill/factory.py index 82fcd37987d66..2adff9ee99b51 100644 --- a/python_modules/libraries/dagstermill/dagstermill/factory.py +++ b/python_modules/libraries/dagstermill/dagstermill/factory.py @@ -21,7 +21,6 @@ from dagster._core.definitions.events import AssetMaterialization, Failure, RetryRequested from dagster._core.definitions.metadata import MetadataValue from dagster._core.definitions.reconstruct import ReconstructableJob -from dagster._core.definitions.utils import normalize_tags from dagster._core.execution.context.compute import OpExecutionContext from dagster._core.execution.context.input import build_input_context from dagster._core.execution.context.system import StepExecutionContext @@ -31,6 +30,7 @@ from dagster._seven import get_system_temp_directory from dagster._utils import mkdir_p, safe_tempfile_path from dagster._utils.error import serializable_error_info_from_exc_info +from dagster._utils.tags import normalize_tags from papermill.engines import papermill_engines from papermill.iorw import load_notebook_node, write_ipynb