Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle "unexecutable" assets in job building and cross process representation #16637

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ def get_required_resource_keys_rec(
def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
def is_source_or_defined_in_unexecutable_assets_def(self) -> bool:
return self._external_asset_node.is_source or not self._external_asset_node.is_executable

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -582,7 +582,7 @@ def resolve_assetObservations(
]

def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -815,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline
]

def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool:
return self.is_source_asset()
return self._external_asset_node.is_source

def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None
Expand Down Expand Up @@ -979,7 +979,7 @@ def resolve_metadata_entries(
def resolve_op(
self, _graphene_info: ResolveInfo
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -1058,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository"
def resolve_required_resources(
self, _graphene_info: ResolveInfo
) -> Sequence[GrapheneResourceRequirement]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return []
node_def_snap = self.get_node_definition_snap()
all_unique_keys = self.get_required_resource_keys(node_def_snap)
Expand All @@ -1071,7 +1071,7 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
output_name = self.external_asset_node.output_name
Expand Down
9 changes: 9 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,15 @@ def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionT
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def is_executable(self):
"""Returns True if this definition represents unexecutable assets.
Assumption: either all or none contained assets are unexecutable.
"""
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
41 changes: 28 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,51 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_exe_assets = []
all_source_assets = [*source_assets]
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_exe_assets.append(assets_def)
else:
# treat all unexecutable assets as potential source assets
all_source_assets.extend(assets_def.to_source_assets())

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=all_exe_assets,
asset_checks=asset_checks,
source_assets=source_assets,
source_assets=all_source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
unpartitioned_assets_defs = exe_assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -101,8 +113,11 @@ def get_base_asset_jobs(
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
assets=[*assets_with_partitions, *unpartitioned_assets_defs],
source_assets=[
*all_source_assets,
*all_exe_assets,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -51,6 +52,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1261,17 +1263,20 @@ def __new__(
),
)

@property
def is_executable(self) -> bool:
def _get_exe_type_metadata(self) -> Optional[str]:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
exe_type_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value
exe_type_text = metadata_value.value

return AssetExecutionType.is_executable(varietal_text)
return exe_type_text

@property
def is_executable(self) -> bool:
return AssetExecutionType.is_executable(self._get_exe_type_metadata())


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down Expand Up @@ -1342,6 +1347,11 @@ def external_repository_data_from_def(
asset_graph = external_asset_nodes_from_defs(
jobs,
source_assets_by_key=repository_def.source_assets_by_key,
unexecutable_assets_defs={
assets_def
for assets_def in repository_def.assets_defs_by_key.values()
if not assets_def.is_executable()
},
)

nested_resource_map = _get_nested_resources_map(
Expand Down Expand Up @@ -1469,6 +1479,7 @@ def external_asset_checks_from_defs(
def external_asset_nodes_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]],
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand All @@ -1488,6 +1499,7 @@ def external_asset_nodes_from_defs(
descriptions_by_asset_key: Dict[AssetKey, str] = {}
atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {}

# resolve details for assets in jobs
for job_def in job_defs:
asset_layer = job_def.asset_layer
asset_info_by_node_output = asset_layer.asset_info_by_node_output_handle
Expand Down Expand Up @@ -1546,10 +1558,22 @@ def external_asset_nodes_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
).difference(source_assets_by_key.keys())

# resolve details for unexecutable assets & their dependencies
unexecutable_keys = set()
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
unexecutable_keys.add(key)
for depends_on in unexec_def.asset_deps[key]:
deps[key][depends_on] = ExternalAssetDependency(depends_on)
dep_by[depends_on][key] = ExternalAssetDependedBy(key)

# build nodes for asset keys that were referred to in jobs but not defined elsewhere
# (handles SourceAsset usage by @ops)
asset_keys_without_definitions = (
all_upstream_asset_keys.difference(node_defs_by_asset_key.keys())
.difference(source_assets_by_key.keys())
.difference(unexecutable_keys)
)
asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
Expand All @@ -1562,6 +1586,7 @@ def external_asset_nodes_from_defs(
for asset_key in asset_keys_without_definitions
]

# build nodes for source assets
for source_asset in source_assets_by_key.values():
if source_asset.key not in node_defs_by_asset_key:
job_names = (
Expand Down Expand Up @@ -1603,6 +1628,7 @@ def external_asset_nodes_from_defs(
)
)

# build nodes for assets backed by ops/jobs
for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]

Expand Down Expand Up @@ -1670,6 +1696,23 @@ def external_asset_nodes_from_defs(
)
)

# build nodes for unexecutable assets defs
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
metadata=unexec_def.metadata_by_key[key],
group_name=unexec_def.group_names_by_key[key],
is_source=False,
is_observable=False,
# could use a special kind tag for more UI punch
# compute_kind=...
)
)

defined = set()
for node in asset_nodes:
if node.asset_key in defined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def repo():
return assets + (asset_checks or [])

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key={}
repo.get_all_jobs(),
source_assets_by_key={},
unexecutable_assets_defs=set(),
)
return ExternalAssetGraph.from_repository_handles_and_external_asset_nodes(
[(MagicMock(), asset_node) for asset_node in external_asset_nodes],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ def hourly_asset(): ...
def unpartitioned_asset(): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
daily_asset,
daily_asset2,
daily_asset_different_start_date,
Expand Down Expand Up @@ -2004,7 +2004,7 @@ def asset_b(): ...
def asset_x(asset_b: B): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
asset_x,
],
source_assets=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,9 @@ def external_asset_graph_from_assets_by_repo_name(
repo = Definitions(assets=assets).get_repository_def()

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(),
source_assets_by_key=repo.source_assets_by_key,
unexecutable_assets_defs=set(),
)
repo_handle = MagicMock(repository_name=repo_name)
from_repository_handles_and_external_asset_nodes.extend(
Expand Down
Loading