Skip to content

Commit

Permalink
[definition-tags] Eliminate the NormalizedTags construct (dagster-io#…
Browse files Browse the repository at this point in the history
…24789)

## Summary & Motivation

Remove the `NormalizedTags` construct from the code. I understand the
theory behind this construct (that tags need only be normalized once)
but it was used inconsistently in the code and isn't a pattern we use
elsewhere. The net effect was to add noise to anything one wants to do
with tags.

## How I Tested These Changes

Existing test suite.

## Changelog

NOCHANGELOG
  • Loading branch information
smackesey authored Oct 8, 2024
1 parent 27cc76b commit efc45cf
Show file tree
Hide file tree
Showing 21 changed files with 41 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _core_submit_execution(
"Either a mode and run_config or a preset must be specified in order to "
f"submit the pipeline {pipeline_name} for execution",
)
tags = normalize_tags(tags).tags
tags = normalize_tags(tags)

pipeline_or_job = "Job" if is_using_job_op_graph_apis else "Pipeline"

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def _check_execute_external_job_args(

return (
run_config,
normalize_tags(tags).tags,
normalize_tags(tags),
op_selection,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(
"Setting a `default_condition` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)

self._run_tags = normalize_tags(run_tags).tags
self._run_tags = normalize_tags(run_tags)

super().__init__(
name=check_valid_name(name),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from dagster._core.definitions.output import OutputDefinition, OutputMapping
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.utils import NormalizedTags, check_valid_name, normalize_tags
from dagster._core.definitions.utils import check_valid_name, normalize_tags
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
Expand Down Expand Up @@ -581,7 +581,7 @@ def alias(self, name: str) -> "PendingNodeInvocation[T_NodeDefinition]":

@public
def tag(self, tags: Optional[Mapping[str, str]]) -> "PendingNodeInvocation[T_NodeDefinition]":
tags = normalize_tags(tags).tags
tags = normalize_tags(tags)
return PendingNodeInvocation(
node_def=self.node_def,
given_alias=self.given_alias,
Expand Down Expand Up @@ -622,7 +622,7 @@ def to_job(
description: Optional[str] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, Any]]] = None,
tags: Optional[Mapping[str, Any]] = None,
logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
executor_def: Optional["ExecutorDefinition"] = None,
hooks: Optional[AbstractSet[HookDefinition]] = None,
Expand All @@ -648,7 +648,7 @@ def to_job(
description=description,
resource_defs=resource_defs,
config=config,
tags=NormalizedTags(self.tags or {}).with_normalized_tags(tags),
tags=normalize_tags({**(self.tags or {}), **(tags or {})}),
logger_defs=logger_defs,
executor_def=executor_def,
hooks=job_hooks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(
"graph_definition",
GraphDefinition,
)
self._additional_tags = normalize_tags(tags).tags
self._additional_tags = normalize_tags(tags)
self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition)
self._retry_policy = check.opt_inst_param(retry_policy, "retry_policy", RetryPolicy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from dagster._core.definitions.output import OutputDefinition, OutputMapping
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.resource_requirement import ResourceRequirement
from dagster._core.definitions.utils import NormalizedTags
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.selector.subset_selector import AssetSelectionData
from dagster._core.types.dagster_type import (
Expand Down Expand Up @@ -219,7 +218,7 @@ def __init__(
input_mappings: Optional[Sequence[InputMapping]] = None,
output_mappings: Optional[Sequence[OutputMapping]] = None,
config: Optional[ConfigMapping] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, str]]] = None,
tags: Optional[Mapping[str, str]] = None,
node_input_source_assets: Optional[Mapping[str, Mapping[str, "SourceAsset"]]] = None,
input_assets: Optional[
Mapping[str, Mapping[str, Union["AssetsDefinition", "SourceAsset"]]]
Expand Down Expand Up @@ -613,7 +612,7 @@ def to_job(
config: Optional[
Union["RunConfig", ConfigMapping, Mapping[str, object], "PartitionedConfig"]
] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, str]]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
executor_def: Optional["ExecutorDefinition"] = None,
Expand Down
21 changes: 8 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/job_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@
ensure_requirements_satisfied,
)
from dagster._core.definitions.run_request import RunRequest
from dagster._core.definitions.utils import (
DEFAULT_IO_MANAGER_KEY,
NormalizedTags,
check_valid_name,
normalize_tags,
)
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY, check_valid_name, normalize_tags
from dagster._core.errors import (
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
Expand Down Expand Up @@ -134,8 +129,8 @@ def __init__(
] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, Any]]] = None,
run_tags: Union[NormalizedTags, Optional[Mapping[str, Any]]] = None,
tags: Optional[Mapping[str, Any]] = None,
run_tags: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
hook_defs: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
Expand Down Expand Up @@ -178,8 +173,8 @@ def __init__(
# same graph may be in multiple jobs, keep separate layer
self._description = check.opt_str_param(description, "description")

self._tags = tags.tags if isinstance(tags, NormalizedTags) else normalize_tags(tags).tags
self._run_tags = run_tags.tags if isinstance(run_tags, NormalizedTags) else run_tags
self._tags = normalize_tags(tags)
self._run_tags = run_tags # don't normalize to preserve None

self._metadata = normalize_metadata(
check.opt_mapping_param(metadata, "metadata", key_type=str)
Expand Down Expand Up @@ -279,8 +274,8 @@ def dagster_internal_init(
],
description: Optional[str],
partitions_def: Optional[PartitionsDefinition],
tags: Union[NormalizedTags, Optional[Mapping[str, Any]]],
run_tags: Union[NormalizedTags, Optional[Mapping[str, Any]]],
tags: Optional[Mapping[str, Any]],
run_tags: Optional[Mapping[str, Any]],
metadata: Optional[Mapping[str, RawMetadataValue]],
hook_defs: Optional[AbstractSet[HookDefinition]],
op_retry_policy: Optional[RetryPolicy],
Expand Down Expand Up @@ -333,7 +328,7 @@ def run_tags(self) -> Mapping[str, str]:
if self._run_tags is None:
return self.tags
else:
return normalize_tags({**self._graph_def.tags, **self._run_tags}).tags
return normalize_tags({**self._graph_def.tags, **self._run_tags})

# This property exists for backcompat purposes. If it is False, then we omit run_tags when
# generating a job snapshot. This lets host processes distinguish between None and {} `run_tags`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
Optional,
Sequence,
Tuple,
Union,
)

import dagster._check as check
from dagster._core.definitions.configurable import NamedConfigurableDefinition
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.utils import NormalizedTags, check_valid_name, normalize_tags
from dagster._core.definitions.utils import check_valid_name, normalize_tags
from dagster._core.errors import DagsterInvariantViolationError

if TYPE_CHECKING:
Expand Down Expand Up @@ -46,12 +45,12 @@ def __init__(
input_defs: Sequence["InputDefinition"],
output_defs: Sequence["OutputDefinition"],
description: Optional[str] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, str]]] = None,
tags: Optional[Mapping[str, str]] = None,
positional_inputs: Optional[Sequence[str]] = None,
):
self._name = check_valid_name(name)
self._description = check.opt_str_param(description, "description")
self._tags = normalize_tags(tags).tags
self._tags = normalize_tags(tags)
self._input_defs = input_defs
self._input_dict = {input_def.name: input_def for input_def in input_defs}
check.invariant(len(self._input_defs) == len(self._input_dict), "Duplicate input def names")
Expand Down Expand Up @@ -202,7 +201,7 @@ def get_pending_invocation(
return PendingNodeInvocation(
node_def=self,
given_alias=given_alias,
tags=normalize_tags(tags).tags if tags else None,
tags=normalize_tags(tags) if tags else None,
hook_defs=hook_defs,
retry_policy=retry_policy,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def get_tags_for_partition_key(
user_tags = self._tags_for_partition_key_fn(partition_key)
else:
user_tags = {}
user_tags = normalize_tags(user_tags, allow_reserved_tags=False).tags
user_tags = normalize_tags(user_tags, allow_reserved_tags=False)

system_tags = {
**self.partitions_def.get_tags_for_partition_key(partition_key),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization, AssetObservation
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.utils import NormalizedTags, normalize_tags
from dagster._core.definitions.utils import normalize_tags
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._core.storage.tags import (
Expand Down Expand Up @@ -122,7 +122,7 @@ def __new__(
cls,
run_key: Optional[str] = None,
run_config: Optional[Union["RunConfig", Mapping[str, Any]]] = None,
tags: Union[NormalizedTags, Optional[Mapping[str, Any]]] = None,
tags: Optional[Mapping[str, Any]] = None,
job_name: Optional[str] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
stale_assets_only: bool = False,
Expand All @@ -139,7 +139,7 @@ def __new__(
cls,
run_key=None,
run_config={},
tags=normalize_tags(tags).tags,
tags=normalize_tags(tags),
job_name=None,
asset_selection=None,
stale_assets_only=False,
Expand All @@ -154,7 +154,7 @@ def __new__(
cls,
run_key=run_key,
run_config=convert_config_input(run_config) or {},
tags=normalize_tags(tags).tags,
tags=normalize_tags(tags),
job_name=job_name,
asset_selection=asset_selection,
stale_assets_only=stale_assets_only,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def __init__(
self._execution_fn = execution_fn
else:
self._execution_fn = check.opt_callable_param(execution_fn, "execution_fn")
self._tags = normalize_tags(tags, allow_reserved_tags=False, warning_stacklevel=5).tags
self._tags = normalize_tags(tags, allow_reserved_tags=False, warning_stacklevel=5)
self._tags_fn = None
self._run_config_fn = None
else:
Expand All @@ -699,7 +699,7 @@ def _default_run_config_fn(context: ScheduleEvaluationContext) -> CoercibleToRun
"Attempted to provide both tags_fn and tags as arguments"
" to ScheduleDefinition. Must provide only one of the two."
)
self._tags = normalize_tags(tags, allow_reserved_tags=False, warning_stacklevel=5).tags
self._tags = normalize_tags(tags, allow_reserved_tags=False, warning_stacklevel=5)
if tags_fn:
self._tags_fn = check.opt_callable_param(
tags_fn, "tags_fn", default=lambda _context: cast(Mapping[str, str], {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ def __init__(
required_resource_keys, "required_resource_keys", of_type=str
)
self._required_resource_keys = self._raw_required_resource_keys or resource_arg_names
self._tags = normalize_tags(tags).tags
self._tags = normalize_tags(tags)
self._metadata = normalize_metadata(
check.opt_mapping_param(metadata, "metadata", key_type=str) # type: ignore # (pyright bug)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ def asset1():
selection=resolved_selection,
config=config,
description=description,
tags=normalize_tags(tags).tags,
tags=normalize_tags(tags),
# Need to preserve None value
run_tags=normalize_tags(run_tags).tags if run_tags is not None else None,
run_tags=normalize_tags(run_tags) if run_tags is not None else None,
metadata=metadata,
partitions_def=partitions_def,
executor_def=executor_def,
Expand Down
18 changes: 3 additions & 15 deletions python_modules/dagster/dagster/_core/definitions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
)

Expand Down Expand Up @@ -160,27 +158,17 @@ def struct_to_string(name: str, **kwargs: object) -> str:
return f"{name}({props_str})"


class NormalizedTags(NamedTuple):
tags: Mapping[str, str]

def with_normalized_tags(self, normalized_tags: "NormalizedTags") -> "NormalizedTags":
return NormalizedTags({**self.tags, **normalized_tags.tags})


def normalize_tags(
tags: Union[NormalizedTags, Optional[Mapping[str, Any]]],
tags: Optional[Mapping[str, Any]],
allow_reserved_tags: bool = True,
warn_on_deprecated_tags: bool = True,
warning_stacklevel: int = 4,
) -> NormalizedTags:
) -> Mapping[str, str]:
"""Normalizes JSON-object tags into string tags and warns on deprecated tags.
New tags properties should _not_ use this function, because it doesn't hard error on tags that
are no longer supported.
"""
if isinstance(tags, NormalizedTags):
return tags

valid_tags: Dict[str, str] = {}
invalid_tag_keys = []
for key, value in check.opt_mapping_param(tags, "tags", key_type=str).items():
Expand Down Expand Up @@ -221,7 +209,7 @@ def normalize_tags(
if not allow_reserved_tags:
check_reserved_tags(valid_tags)

return NormalizedTags(valid_tags)
return valid_tags


# Inspired by allowed Kubernetes labels:
Expand Down
4 changes: 1 addition & 3 deletions python_modules/dagster/dagster/_core/execution/plan/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ def __new__(
so.name: so
for so in check.sequence_param(step_outputs, "step_outputs", of_type=StepOutput)
},
tags=normalize_tags(
check.opt_mapping_param(tags, "tags", key_type=str), warn_on_deprecated_tags=False
).tags,
tags=normalize_tags(tags, warn_on_deprecated_tags=False),
logging_tags=merge_dicts(
{
"step_key": handle.to_key(),
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ def create_run(
check.opt_inst_param(status, "status", DagsterRunStatus)
check.opt_mapping_param(tags, "tags", key_type=str)

validated_tags = normalize_tags(tags, warn_on_deprecated_tags=False).tags
validated_tags = normalize_tags(tags, warn_on_deprecated_tags=False)

check.opt_str_param(root_run_id, "root_run_id")
check.opt_str_param(parent_run_id, "parent_run_id")
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ def _create_sensor_run(

job_tags = normalize_tags(
external_job.run_tags or {}, allow_reserved_tags=False, warn_on_deprecated_tags=False
).tags
)
tags = merge_dicts(
merge_dicts(job_tags, run_request.tags),
# this gets applied in the sensor definition too, but we apply it here for backcompat
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ def _create_scheduler_run(
tags = merge_dicts(
normalize_tags(
external_job.run_tags, allow_reserved_tags=False, warn_on_deprecated_tags=False
).tags
)
or {},
schedule_tags,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def basic():
def basic_job():
basic()

assert len(caught_warnings) == 1
warning = caught_warnings[0]
assert "Non-compliant tag keys like ['my_tag&', 'my_tag#'] are deprecated" in str(
warning.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def iris_dataset():
io_manager_key, "io_manager_key", default="output_notebook_io_manager"
)

user_tags = normalize_tags(op_tags).tags
user_tags = normalize_tags(op_tags)
if op_tags is not None:
check.invariant(
"notebook_path" not in op_tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def define_dagstermill_op(
default_description = f"This op is backed by the notebook at {notebook_path}"
description = check.opt_str_param(description, "description", default=default_description)

user_tags = normalize_tags(tags).tags
user_tags = normalize_tags(tags)
if tags is not None:
check.invariant(
"notebook_path" not in tags,
Expand Down

0 comments on commit efc45cf

Please sign in to comment.