Skip to content

Commit

Permalink
make ExternalAssetNode a record (#23399)
Browse files Browse the repository at this point in the history
## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
sryza authored Aug 5, 2024
1 parent e08bd93 commit ee3f534
Showing 1 changed file with 64 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._core.utils import is_valid_email
from dagster._record import IHaveNew, record_custom
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import FieldSerializer, is_whitelisted_for_serdes_object
from dagster._time import datetime_from_timestamp
Expand Down Expand Up @@ -1339,50 +1340,46 @@ def pack(self, __unpacked_value, whitelist_map, descent_path):
"owners": BackcompatTeamOwnerFieldDeserializer,
},
)
class ExternalAssetNode(
NamedTuple(
"_ExternalAssetNode",
[
("asset_key", AssetKey),
("dependencies", Sequence[ExternalAssetDependency]),
("depended_by", Sequence[ExternalAssetDependedBy]),
("execution_type", AssetExecutionType),
("compute_kind", Optional[str]),
("op_name", Optional[str]),
("op_names", Sequence[str]),
("code_version", Optional[str]),
("node_definition_name", Optional[str]),
("graph_name", Optional[str]),
# op_description is a misleading name - this is the description for the asset, not for
# the op
("op_description", Optional[str]),
("job_names", Sequence[str]),
("partitions_def_data", Optional[ExternalPartitionsDefinitionData]),
("output_name", Optional[str]),
("output_description", Optional[str]),
("metadata", Mapping[str, MetadataValue]),
("tags", Optional[Mapping[str, str]]),
("group_name", str),
("freshness_policy", Optional[FreshnessPolicy]),
("is_source", bool),
("is_observable", bool),
# If a set of assets can't be materialized independently from each other, they will all
# have the same execution_set_identifier. This ID should be stable across reloads and
# unique deployment-wide.
("execution_set_identifier", Optional[str]),
("required_top_level_resources", Optional[Sequence[str]]),
("auto_materialize_policy", Optional[AutoMaterializePolicy]),
("backfill_policy", Optional[BackfillPolicy]),
("auto_observe_interval_minutes", Optional[float]),
("owners", Optional[Sequence[str]]),
],
)
):
@record_custom
class ExternalAssetNode(IHaveNew):
"""A definition of a node in the logical asset graph.
A function for computing the asset and an identifier for that asset.
"""

asset_key: AssetKey
dependencies: Sequence[ExternalAssetDependency]
depended_by: Sequence[ExternalAssetDependedBy]
execution_type: AssetExecutionType
compute_kind: Optional[str]
op_name: Optional[str]
op_names: Sequence[str]
code_version: Optional[str]
node_definition_name: Optional[str]
graph_name: Optional[str]
# op_description is a misleading name - this is the description for the asset, not for
# the op
op_description: Optional[str]
job_names: Sequence[str]
partitions_def_data: Optional[ExternalPartitionsDefinitionData]
output_name: Optional[str]
output_description: Optional[str]
metadata: Mapping[str, MetadataValue]
tags: Optional[Mapping[str, str]]
group_name: str
freshness_policy: Optional[FreshnessPolicy]
is_source: bool
is_observable: bool
# If a set of assets can't be materialized independently from each other, they will all
# have the same execution_set_identifier. This ID should be stable across reloads and
# unique deployment-wide.
execution_set_identifier: Optional[str]
required_top_level_resources: Optional[Sequence[str]]
auto_materialize_policy: Optional[AutoMaterializePolicy]
backfill_policy: Optional[BackfillPolicy]
auto_observe_interval_minutes: Optional[Union[float, int]]
owners: Optional[Sequence[str]]

def __new__(
cls,
asset_key: AssetKey,
Expand Down Expand Up @@ -1410,7 +1407,7 @@ def __new__(
required_top_level_resources: Optional[Sequence[str]] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
auto_observe_interval_minutes: Optional[float] = None,
auto_observe_interval_minutes: Optional[Union[float, int]] = None,
owners: Optional[Sequence[str]] = None,
):
metadata = normalize_metadata(
Expand Down Expand Up @@ -1458,59 +1455,37 @@ def __new__(
# job, and no source assets could be part of any job
is_source = len(job_names or []) == 0

return super(ExternalAssetNode, cls).__new__(
return super().__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
dependencies=check.opt_sequence_param(
dependencies, "dependencies", of_type=ExternalAssetDependency
),
depended_by=check.opt_sequence_param(
depended_by, "depended_by", of_type=ExternalAssetDependedBy
),
compute_kind=check.opt_str_param(compute_kind, "compute_kind"),
op_name=check.opt_str_param(op_name, "op_name"),
op_names=check.opt_sequence_param(op_names, "op_names"),
code_version=check.opt_str_param(code_version, "code_version"),
node_definition_name=check.opt_str_param(node_definition_name, "node_definition_name"),
graph_name=check.opt_str_param(graph_name, "graph_name"),
op_description=check.opt_str_param(
op_description or output_description, "op_description"
),
job_names=check.opt_sequence_param(job_names, "job_names", of_type=str),
partitions_def_data=check.opt_inst_param(
partitions_def_data, "partitions_def_data", ExternalPartitionsDefinitionData
),
output_name=check.opt_str_param(output_name, "output_name"),
output_description=check.opt_str_param(output_description, "output_description"),
asset_key=asset_key,
dependencies=dependencies or [],
depended_by=depended_by or [],
compute_kind=compute_kind,
op_name=op_name,
op_names=op_names or [],
code_version=code_version,
node_definition_name=node_definition_name,
graph_name=graph_name,
op_description=op_description or output_description,
job_names=job_names or [],
partitions_def_data=partitions_def_data,
output_name=output_name,
output_description=output_description,
metadata=metadata,
tags=check.opt_mapping_param(tags, "tags", key_type=str, value_type=str),
tags=tags or {},
# Newer code always passes a string group name when constructing these, but we assign
# the default here for backcompat.
group_name=check.opt_str_param(group_name, "group_name") or DEFAULT_GROUP_NAME,
freshness_policy=check.opt_inst_param(
freshness_policy, "freshness_policy", FreshnessPolicy
),
is_source=check.bool_param(is_source, "is_source"),
is_observable=check.bool_param(is_observable, "is_observable"),
execution_set_identifier=check.opt_str_param(
execution_set_identifier, "execution_set_identifier"
),
required_top_level_resources=check.opt_sequence_param(
required_top_level_resources, "required_top_level_resources", of_type=str
),
auto_materialize_policy=check.opt_inst_param(
auto_materialize_policy,
"auto_materialize_policy",
AutoMaterializePolicy,
),
backfill_policy=check.opt_inst_param(
backfill_policy, "backfill_policy", BackfillPolicy
),
auto_observe_interval_minutes=check.opt_numeric_param(
auto_observe_interval_minutes, "auto_observe_interval_minutes"
),
owners=check.opt_sequence_param(owners, "owners", of_type=str),
execution_type=check.inst_param(execution_type, "execution_type", AssetExecutionType),
group_name=group_name or DEFAULT_GROUP_NAME,
freshness_policy=freshness_policy,
is_source=is_source,
is_observable=is_observable,
execution_set_identifier=execution_set_identifier,
required_top_level_resources=required_top_level_resources or [],
auto_materialize_policy=auto_materialize_policy,
backfill_policy=backfill_policy,
auto_observe_interval_minutes=auto_observe_interval_minutes,
owners=owners or [],
execution_type=execution_type,
)

@property
Expand Down

0 comments on commit ee3f534

Please sign in to comment.