Skip to content

Commit

Permalink
[exploration] unexecutable assets not backed by jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 25, 2023
1 parent 6fd4a1e commit aa652da
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ def get_required_resource_keys_rec(
def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
def is_source_or_defined_in_unexecutable_assets_def(self) -> bool:
return self._external_asset_node.is_source or not self._external_asset_node.is_executable

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -581,7 +581,7 @@ def resolve_assetObservations(
]

def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -814,7 +814,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline
]

def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool:
return self.is_source_asset()
return self._external_asset_node.is_source

def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None
Expand Down Expand Up @@ -978,7 +978,7 @@ def resolve_metadata_entries(
def resolve_op(
self, _graphene_info: ResolveInfo
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -1057,7 +1057,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository"
def resolve_required_resources(
self, _graphene_info: ResolveInfo
) -> Sequence[GrapheneResourceRequirement]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return []
node_def_snap = self.get_node_definition_snap()
all_unique_keys = self.get_required_resource_keys(node_def_snap)
Expand All @@ -1070,7 +1070,7 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if self.is_source_asset():
if self.is_source_or_defined_in_unexecutable_assets_def():
return None
external_pipeline = self.get_external_job()
output_name = self.external_asset_node.output_name
Expand Down
22 changes: 22 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,28 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def is_executable(self):
"""Returns True if this definition represents unexecutable assets.
Assumption: either all or none contained assets are unexecutable.
"""
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

@property
def execution_type(self):
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

# assumes execution type stored as metadata is same across all assets
for key in self.keys:
return AssetExecutionType.str_to_enum(
self._metadata_by_key.get(key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
39 changes: 26 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,49 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_source_assets = [*source_assets]
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
else:
# treat all unexecutable assets as potential source assets
all_source_assets.extend(assets_def.to_source_assets())

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

unpartitioned_assets_defs = exe_assets_by_partitions_def.get(None, [])
if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=unpartitioned_assets_defs,
asset_checks=asset_checks,
source_assets=source_assets,
source_assets=all_source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -101,8 +111,11 @@ def get_base_asset_jobs(
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
assets=[*assets_with_partitions, *unpartitioned_assets_defs],
source_assets=[
*all_source_assets,
*unpartitioned_assets_defs,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def an_asset():
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -51,6 +52,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1260,17 +1262,24 @@ def __new__(
),
)

@property
def is_executable(self) -> bool:
def _get_capability_metadata(self) -> Optional[str]:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
cap_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value
cap_text = metadata_value.value

return cap_text

@property
def is_executable(self) -> bool:
return AssetExecutionType.is_executable(self._get_capability_metadata())

return AssetExecutionType.is_executable(varietal_text)
@property
def execution_type(self) -> AssetExecutionType:
return AssetExecutionType.str_to_enum(self._get_capability_metadata())


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down Expand Up @@ -1341,6 +1350,11 @@ def external_repository_data_from_def(
asset_graph = external_asset_graph_from_defs(
jobs,
source_assets_by_key=repository_def.source_assets_by_key,
unexecutable_assets_defs={
assets_def
for assets_def in repository_def.assets_defs_by_key.values()
if not assets_def.is_executable()
},
)

nested_resource_map = _get_nested_resources_map(
Expand Down Expand Up @@ -1462,6 +1476,7 @@ def external_asset_checks_from_defs(
def external_asset_graph_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]] = None, # drop default value
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand Down Expand Up @@ -1535,22 +1550,9 @@ def external_asset_graph_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
).difference(source_assets_by_key.keys())

asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
)
for asset_key in asset_keys_without_definitions
]
asset_nodes: List[ExternalAssetNode] = []

# build nodes for source assets
for source_asset in source_assets_by_key.values():
if source_asset.key not in node_defs_by_asset_key:
job_names = (
Expand Down Expand Up @@ -1592,6 +1594,7 @@ def external_asset_graph_from_defs(
)
)

# build nodes for assets backed by ops/jobs
for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]

Expand Down Expand Up @@ -1659,6 +1662,50 @@ def external_asset_graph_from_defs(
)
)

# resolve deps unexec assets since they are not encoded in jobs
unexecutable_keys = set()
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
unexecutable_keys.add(key)
for depends_on in unexec_def.asset_deps[key]:
deps[key][depends_on] = ExternalAssetDependency(depends_on)
dep_by[depends_on][key] = ExternalAssetDependedBy(key)

# build nodes for unexecutable assets defs
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
metadata=unexec_def.metadata_by_key[key],
group_name=unexec_def.group_names_by_key[key],
is_source=False,
is_observable=False,
# could use a special kind tag for more UI punch
# compute_kind=...
)
)

# build nodes for remaining asset keys that were referred in other assets but not defined
asset_keys_without_definitions = (
all_upstream_asset_keys.difference(node_defs_by_asset_key.keys())
.difference(source_assets_by_key.keys())
.difference(unexecutable_keys)
)
for asset_key in asset_keys_without_definitions:
asset_nodes.append(
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
)
)

defined = set()
for node in asset_nodes:
if node.asset_key in defined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ def hourly_asset(): ...
def unpartitioned_asset(): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
daily_asset,
daily_asset2,
daily_asset_different_start_date,
Expand Down Expand Up @@ -2004,7 +2004,7 @@ def asset_b(): ...
def asset_x(asset_b: B): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
asset_x,
],
source_assets=[
Expand Down
Loading

0 comments on commit aa652da

Please sign in to comment.