Skip to content

Commit

Permalink
[exploration] unexecutable assets not backed by jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 22, 2023
1 parent 7f776c0 commit b1222d0
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetsDefCapability
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
NULL_DATA_VERSION,
Expand Down Expand Up @@ -455,8 +456,8 @@ def get_required_resource_keys_rec(
def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
def is_not_materialize_check_asset(self) -> bool:
return self._external_asset_node.capability_type != AssetsDefCapability.MATERIALIZE_CHECK

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -581,7 +582,7 @@ def resolve_assetObservations(
]

def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if self.is_source_asset():
if self.is_not_materialize_check_asset():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -814,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline
]

def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool:
return self.is_source_asset()
return self._external_asset_node.is_source

def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None
Expand Down Expand Up @@ -978,7 +979,7 @@ def resolve_metadata_entries(
def resolve_op(
self, _graphene_info: ResolveInfo
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if self.is_source_asset():
if self.is_not_materialize_check_asset():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -1057,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository"
def resolve_required_resources(
self, _graphene_info: ResolveInfo
) -> Sequence[GrapheneResourceRequirement]:
if self.is_source_asset():
if self.is_not_materialize_check_asset():
return []
node_def_snap = self.get_node_definition_snap()
all_unique_keys = self.get_required_resource_keys(node_def_snap)
Expand All @@ -1070,7 +1071,7 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if self.is_source_asset():
if self.is_not_materialize_check_asset():
return None
external_pipeline = self.get_external_job()
output_name = self.external_asset_node.output_name
Expand Down
39 changes: 25 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,40 @@
if TYPE_CHECKING:
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep

# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# This lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
# whih encodes the execution type the of asset. "Unexecutable" assets are assets
# that cannot be materialized in Dagster, but can have events in the event
# log keyed off of them, making Dagster usable as a observability and lineage tool
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"
# and encodes the capability type the of backing AssetsDefinition.
# Assets backed by AssetsDefinitions with no capabilities cannot be materialized in Dagster,
# but can have events in the event log keyed off of them, making Dagster usable as
# an observability and lineage tool for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY = "dagster/asset_def_capability"


class AssetExecutionType(Enum):
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZATION = "MATERIALIZATION"
class AssetsDefCapability(Enum):
# default asset def capability, has backing op to execute that materializes/checks assets
MATERIALIZE_CHECK = "MATERIALIZE_CHECK"

# asset def can facilitate IOManager loading of asset, but not execute
SOURCE = "SOURCE"

# asset def can facilitate IOManager loading of asset, has backing op to observe data version
# OBSERVE_SOURCE = "OBSERVE_SOURCE" # what obs source asset support would look like

# asset def only defines basic properties of asset, no further capabilities
NONE = "NONE"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION}
return AssetsDefCapability.str_to_enum(varietal_str) in {
AssetsDefCapability.MATERIALIZE_CHECK
}

@staticmethod
def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType":
def str_to_enum(cap_str: Optional[str]) -> "AssetsDefCapability":
return (
AssetExecutionType.MATERIALIZATION
if varietal_str is None
else AssetExecutionType(varietal_str)
AssetsDefCapability.MATERIALIZE_CHECK
if cap_str is None
else AssetsDefCapability(cap_str)
)


Expand Down
30 changes: 26 additions & 4 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,36 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
bool: True if the asset key is materializable by this AssetsDefinition.
"""
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY,
AssetsDefCapability,
)

return AssetExecutionType.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
return AssetsDefCapability.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY)
)

def is_executable(self):
"""Returns True if this definition represents unexecutable assets.
Assumption: either all or none contained assets are unexecutable.
"""
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

@property
def capability_type(self):
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY,
AssetsDefCapability,
)

# assumes capability is stored as metadata is same across all assets
for key in self.keys:
return AssetsDefCapability.str_to_enum(
self._metadata_by_key.get(key, {}).get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
37 changes: 25 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Tuple,
Union,
)
from dagster._core.definitions.asset_spec import AssetsDefCapability

from toposort import CircularDependencyError, toposort

Expand Down Expand Up @@ -58,39 +59,48 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
all_source_assets = [*source_assets]
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
elif assets_def.capability_type == AssetsDefCapability.SOURCE:
all_source_assets.extend(assets_def.to_source_assets())

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=exe_assets_by_partitions_def.get(None, []),
asset_checks=asset_checks,
source_assets=source_assets,
source_assets=all_source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
unpartitioned_assets = exe_assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -102,7 +112,10 @@ def get_base_asset_jobs(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
source_assets=[
*all_source_assets,
*assets_defs, # should this be *unpartitioned_assets ?
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from dagster import _check as check
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY,
AssetsDefCapability,
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
Expand Down Expand Up @@ -34,18 +34,14 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
group_name=spec.group_name,
metadata={
**(spec.metadata or {}),
**{
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: (
AssetExecutionType.UNEXECUTABLE.value
)
},
**{SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY: AssetsDefCapability.NONE.value},
},
deps=spec.deps,
)
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def an_asset():
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)
Expand All @@ -67,7 +63,7 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**{SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY: AssetsDefCapability.SOURCE.value},
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -48,9 +49,10 @@
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY,
AssetsDefCapability,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1260,17 +1262,24 @@ def __new__(
),
)

@property
def is_executable(self) -> bool:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
def _get_capability_metadata(self) -> Optional[str]:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY)
if not metadata_value:
varietal_text = None
cap_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value
cap_text = metadata_value.value

return AssetExecutionType.is_executable(varietal_text)
return cap_text

@property
def is_executable(self) -> bool:
return AssetsDefCapability.is_executable(self._get_capability_metadata())

@property
def capability_type(self) -> AssetsDefCapability:
return AssetsDefCapability.str_to_enum(self._get_capability_metadata())


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down Expand Up @@ -1341,6 +1350,11 @@ def external_repository_data_from_def(
asset_graph = external_asset_graph_from_defs(
jobs,
source_assets_by_key=repository_def.source_assets_by_key,
unexecutable_assets_defs={
assets_def
for assets_def in repository_def.assets_defs_by_key.values()
if not assets_def.is_executable()
},
)

nested_resource_map = _get_nested_resources_map(
Expand Down Expand Up @@ -1462,6 +1476,7 @@ def external_asset_checks_from_defs(
def external_asset_graph_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]] = None, # drop default value
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand Down Expand Up @@ -1659,6 +1674,30 @@ def external_asset_graph_from_defs(
)
)

# resolve deps unexec assets since they are not encoded in jobs
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
for depends_on in unexec_def.asset_deps[key]:
deps[key][depends_on] = ExternalAssetDependency(depends_on)
dep_by[depends_on][key] = ExternalAssetDependedBy(key)

# build nodes for unexec assets
for unexec_def in unexecutable_assets_defs or []:
for key in unexec_def.keys:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
metadata=unexec_def.metadata_by_key[key],
group_name=unexec_def.group_names_by_key[key],
is_source=unexec_def.capability_type == AssetsDefCapability.SOURCE,
is_observable=False,
# could use a special kind tag for more UI punch
# compute_kind=...
)
)

return asset_nodes


Expand Down
Loading

0 comments on commit b1222d0

Please sign in to comment.