Skip to content

Commit

Permalink
[exploration] unexecutable assets not backed by jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 20, 2023
1 parent 336c227 commit ce171b1
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,9 @@ def stale_status_loader(self) -> StaleStatusLoader:
def get_external_job(self) -> ExternalJob:
if self._external_job is None:
check.invariant(
len(self._external_asset_node.job_names) >= 1,
"Asset must be part of at least one job",
len(self._external_asset_node.job_names) >= 1
or not self._external_asset_node.is_executable,
"Executable asset must be part of at least one job",
)
self._external_job = self._external_repository.get_full_external_job(
self._external_asset_node.job_names[0]
Expand Down Expand Up @@ -455,8 +456,8 @@ def get_required_resource_keys_rec(
def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
def is_source_or_unexecutable_asset(self) -> bool:
return self._external_asset_node.is_source or not self._external_asset_node.is_executable

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -581,7 +582,7 @@ def resolve_assetObservations(
]

def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if self.is_source_asset():
if self.is_source_or_unexecutable_asset():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -814,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline
]

def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool:
return self.is_source_asset()
return self._external_asset_node.is_source

def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None
Expand Down Expand Up @@ -978,7 +979,7 @@ def resolve_metadata_entries(
def resolve_op(
self, _graphene_info: ResolveInfo
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if self.is_source_asset():
if self.is_source_or_unexecutable_asset():
return None
external_pipeline = self.get_external_job()
node_def_snap = self.get_node_definition_snap()
Expand Down Expand Up @@ -1057,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository"
def resolve_required_resources(
self, _graphene_info: ResolveInfo
) -> Sequence[GrapheneResourceRequirement]:
if self.is_source_asset():
if self.is_source_or_unexecutable_asset():
return []
node_def_snap = self.get_node_definition_snap()
all_unique_keys = self.get_required_resource_keys(node_def_snap)
Expand All @@ -1070,7 +1071,7 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if self.is_source_asset():
if self.is_source_or_unexecutable_asset():
return None
external_pipeline = self.get_external_job()
output_name = self.external_asset_node.output_name
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def is_executable(self):
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
28 changes: 17 additions & 11 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,45 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=exe_assets_by_partitions_def.get(None, []),
asset_checks=asset_checks,
source_assets=source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
unpartitioned_assets = exe_assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -102,7 +108,7 @@ def get_base_asset_jobs(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
source_assets=[*source_assets, *assets_defs],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def an_asset():
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ def __init__(
sensors, "sensors", key_type=str, value_type=(SensorDefinition, FunctionType)
)
check.mapping_param(
source_assets_by_key, "source_assets_by_key", key_type=AssetKey, value_type=SourceAsset
source_assets_by_key,
"source_assets_by_key",
key_type=AssetKey, # value_type=SourceAsset
)
check.mapping_param(
assets_defs_by_key, "assets_defs_by_key", key_type=AssetKey, value_type=AssetsDefinition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -51,6 +52,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1341,6 +1343,11 @@ def external_repository_data_from_def(
asset_graph = external_asset_graph_from_defs(
jobs,
source_assets_by_key=repository_def.source_assets_by_key,
unexecutable_assets={
assets_def
for assets_def in repository_def.assets_defs_by_key.values()
if not assets_def.is_executable()
},
)

nested_resource_map = _get_nested_resources_map(
Expand Down Expand Up @@ -1462,6 +1469,7 @@ def external_asset_checks_from_defs(
def external_asset_graph_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
unexecutable_assets: Optional[AbstractSet[AssetsDefinition]] = None, # drop default value
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand Down Expand Up @@ -1659,6 +1667,30 @@ def external_asset_graph_from_defs(
)
)

# resolve deps unexec assets since they are not encoded in jobs
for unexec_def in unexecutable_assets or []:
for key in unexec_def.keys:
for depends_on in unexec_def.asset_deps[key]:
deps[key][depends_on] = ExternalAssetDependency(depends_on)
dep_by[depends_on][key] = ExternalAssetDependedBy(key)

# build nodes for unexec assets
for unexec_def in unexecutable_assets or []:
for key in unexec_def.keys:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
metadata=unexec_def.metadata_by_key[key],
group_name=unexec_def.group_names_by_key[key],
is_source=False,
is_observable=False,
# could use a special kind tag for more UI punch
# compute_kind=...
)
)

return asset_nodes


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ def hourly_asset(): ...
def unpartitioned_asset(): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
daily_asset,
daily_asset2,
daily_asset_different_start_date,
Expand Down Expand Up @@ -2004,7 +2004,7 @@ def asset_b(): ...
def asset_x(asset_b: B): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
asset_x,
],
source_assets=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
Definitions,
_check as check,
asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.errors import DagsterInvalidSubsetError
from dagster._core.host_representation.external_data import (
external_repository_data_from_def,
)


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -72,3 +77,55 @@ def test_observable_asset_creation_with_deps() -> None:
assert assets_def.asset_deps[expected_key] == {
AssetKey(["observable_asset_two"]),
}


def test_non_executable_asset_excluded_from_job() -> None:
upstream_asset = create_unexecutable_observable_assets_def(
specs=[AssetSpec("upstream_asset")],
)

@asset(deps=[upstream_asset])
def downstream_asset() -> None: ...

defs = Definitions(assets=[upstream_asset, downstream_asset])

assert defs.get_implicit_global_asset_job_def().execute_in_process().success

# ensure that explict selection fails
with pytest.raises(
DagsterInvalidSubsetError,
match=(
r'Assets provided in asset_selection argument \["upstream_asset"\] do not exist in'
r" parent asset group or job"
),
):
defs.get_implicit_global_asset_job_def().execute_in_process(
asset_selection=[AssetKey("upstream_asset")]
)


def test_external_rep():
table_a = AssetSpec("table_A")
table_b = AssetSpec("table_B", deps=[table_a])
table_c = AssetSpec("table_C", deps=[table_a])
table_d = AssetSpec("table_D", deps=[table_b, table_c])

those_assets = create_unexecutable_observable_assets_def(
specs=[table_a, table_b, table_c, table_d]
)

defs = Definitions(assets=[those_assets])

external_repo = external_repository_data_from_def(
defs.get_inner_repository_for_loading_process()
)
assert len(external_repo.external_asset_graph_data) == 4
nodes_by_key = {node.asset_key: node for node in external_repo.external_asset_graph_data}
assert len(nodes_by_key[table_a.key].depended_by) == 2

# external_asset_nodes = external_asset_graph_from_defs(
# [],
# {upstream_asset.key: upstream_asset},
# )
# print(external_asset_nodes)
# assert False

0 comments on commit ce171b1

Please sign in to comment.