Skip to content

Commit

Permalink
[exploration] unexecutable assets not backed by jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 25, 2023
1 parent 99c287d commit 73878a1
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ def get_required_resource_keys_rec(
def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
def is_source_or_defined_in_unexecutable_assets_def(self) -> bool:
return self._external_asset_node.is_source or not self._external_asset_node.is_executable

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -581,7 +581,7 @@ def resolve_assetObservations(
]

def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -814,7 +814,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline
]

def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool:
return self.is_source_asset()
return self._external_asset_node.is_source

def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None
Expand Down Expand Up @@ -978,7 +978,7 @@ def resolve_metadata_entries(
def resolve_op(
self, _graphene_info: ResolveInfo
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -1057,7 +1057,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository"
def resolve_required_resources(
self, _graphene_info: ResolveInfo
) -> Sequence[GrapheneResourceRequirement]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return []
node_def_snap = self.get_node_definition_snap()
all_unique_keys = self.get_required_resource_keys(node_def_snap)
Expand All @@ -1070,7 +1070,7 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
output_name = self.external_asset_node.output_name
Expand Down
22 changes: 22 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,28 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def is_executable(self):
"""Returns True if this definition represents unexecutable assets.
Assumption: either all or none contained assets are unexecutable.
"""
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

@property
def execution_type(self):
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

# assumes execution type stored as metadata is same across all assets
for key in self.keys:
return AssetExecutionType.str_to_enum(
self._metadata_by_key.get(key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
41 changes: 28 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,51 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_exe_assets = []
all_source_assets = [*source_assets]
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_exe_assets.append(assets_def)
else:
# treat all unexecutable assets as potential source assets
all_source_assets.extend(assets_def.to_source_assets())

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=all_exe_assets,
asset_checks=asset_checks,
source_assets=source_assets,
source_assets=all_source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
unpartitioned_assets_defs = exe_assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -101,8 +113,11 @@ def get_base_asset_jobs(
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
assets=[*assets_with_partitions, *unpartitioned_assets_defs],
source_assets=[
*all_source_assets,
*all_exe_assets,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def an_asset():
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -51,6 +52,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1260,17 +1262,24 @@ def __new__(
),
)

@property
def is_executable(self) -> bool:
def _get_exe_type_metadata(self) -> Optional[str]:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
cap_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value
cap_text = metadata_value.value

return cap_text

@property
def is_executable(self) -> bool:
return AssetExecutionType.is_executable(self._get_exe_type_metadata())

return AssetExecutionType.is_executable(varietal_text)
@property
def execution_type(self) -> AssetExecutionType:
return AssetExecutionType.str_to_enum(self._get_exe_type_metadata())


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down Expand Up @@ -1341,6 +1350,11 @@ def external_repository_data_from_def(
asset_graph = external_asset_graph_from_defs(
jobs,
source_assets_by_key=repository_def.source_assets_by_key,
unexecutable_assets_defs={
assets_def
for assets_def in repository_def.assets_defs_by_key.values()
if not assets_def.is_executable()
},
)

nested_resource_map = _get_nested_resources_map(
Expand Down Expand Up @@ -1462,6 +1476,7 @@ def external_asset_checks_from_defs(
def external_asset_graph_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]],
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand All @@ -1481,6 +1496,7 @@ def external_asset_graph_from_defs(
descriptions_by_asset_key: Dict[AssetKey, str] = {}
atomic_execution_unit_ids_by_asset_key: Dict[AssetKey, str] = {}

# resolve details for assets in jobs
for job_def in job_defs:
asset_layer = job_def.asset_layer
asset_info_by_node_output = asset_layer.asset_info_by_node_output_handle
Expand Down Expand Up @@ -1535,10 +1551,22 @@ def external_asset_graph_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
).difference(source_assets_by_key.keys())

# resolve details for unexecutable assets & their dependencies
unexecutable_keys = set()
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
unexecutable_keys.add(key)
for depends_on in unexec_def.asset_deps[key]:
deps[key][depends_on] = ExternalAssetDependency(depends_on)
dep_by[depends_on][key] = ExternalAssetDependedBy(key)

# build nodes for asset keys that were referred to in jobs but not defined elsewhere
# (handles SourceAsset usage by @ops)
asset_keys_without_definitions = (
all_upstream_asset_keys.difference(node_defs_by_asset_key.keys())
.difference(source_assets_by_key.keys())
.difference(unexecutable_keys)
)
asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
Expand All @@ -1551,6 +1579,7 @@ def external_asset_graph_from_defs(
for asset_key in asset_keys_without_definitions
]

# build nodes for source assets
for source_asset in source_assets_by_key.values():
if source_asset.key not in node_defs_by_asset_key:
job_names = (
Expand Down Expand Up @@ -1592,6 +1621,7 @@ def external_asset_graph_from_defs(
)
)

# build nodes for assets backed by ops/jobs
for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]

Expand Down Expand Up @@ -1659,6 +1689,23 @@ def external_asset_graph_from_defs(
)
)

# build nodes for unexecutable assets defs
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
metadata=unexec_def.metadata_by_key[key],
group_name=unexec_def.group_names_by_key[key],
is_source=False,
is_observable=False,
# could use a special kind tag for more UI punch
# compute_kind=...
)
)

defined = set()
for node in asset_nodes:
if node.asset_key in defined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def repo():
return assets

external_asset_nodes = external_asset_graph_from_defs(
repo.get_all_jobs(), source_assets_by_key={}
repo.get_all_jobs(),
source_assets_by_key={},
unexecutable_assets_defs=set(),
)
return ExternalAssetGraph.from_repository_handles_and_external_asset_nodes(
[(MagicMock(), asset_node) for asset_node in external_asset_nodes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ def hourly_asset(): ...
def unpartitioned_asset(): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
daily_asset,
daily_asset2,
daily_asset_different_start_date,
Expand Down Expand Up @@ -2004,7 +2004,7 @@ def asset_b(): ...
def asset_x(asset_b: B): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
asset_x,
],
source_assets=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ def external_asset_graph_from_assets_by_repo_name(
repo = Definitions(assets=assets).get_repository_def()

external_asset_nodes = external_asset_graph_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(),
source_assets_by_key=repo.source_assets_by_key,
unexecutable_assets_defs=set(),
)
repo_handle = MagicMock(repository_name=repo_name)
from_repository_handles_and_external_asset_nodes.extend(
Expand Down
Loading

0 comments on commit 73878a1

Please sign in to comment.