From 1edbf01250fe831c8ef4ac7c4507136363f18d57 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Tue, 19 Sep 2023 15:01:44 -0500 Subject: [PATCH] [exploration] unexecutable assets not backed by jobs --- .../dagster_graphql/schema/asset_graph.py | 14 +- .../dagster/_core/definitions/assets.py | 9 ++ .../dagster/_core/definitions/assets_job.py | 41 ++++-- .../repository_data_builder.py | 2 +- .../host_representation/external_data.py | 61 ++++++-- .../asset_defs_tests/test_asset_graph.py | 4 +- .../asset_defs_tests/test_assets_job.py | 4 +- .../execution_tests/test_asset_backfill.py | 4 +- .../test_external_data.py | 130 ++++++++++++++---- .../definitions_tests/test_external_assets.py | 86 ++++++++++-- 10 files changed, 288 insertions(+), 67 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 5febc0acf00a2..64b5ccf689599 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -456,8 +456,8 @@ def get_required_resource_keys_rec( def is_graph_backed_asset(self) -> bool: return self.graphName is not None - def is_source_asset(self) -> bool: - return self._external_asset_node.is_source + def is_source_or_defined_in_unexecutable_assets_def(self) -> bool: + return self._external_asset_node.is_source or not self._external_asset_node.is_executable def resolve_hasMaterializePermission( self, @@ -582,7 +582,7 @@ def resolve_assetObservations( ] def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]: - if self.is_source_asset(): + if self.is_source_or_defined_in_unexecutable_assets_def(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -815,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline ] def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool: - return self.is_source_asset() + return self._external_asset_node.is_source def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.partitions_def_data is not None @@ -979,7 +979,7 @@ def resolve_metadata_entries( def resolve_op( self, _graphene_info: ResolveInfo ) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]: - if self.is_source_asset(): + if self.is_source_or_defined_in_unexecutable_assets_def(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -1058,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository" def resolve_required_resources( self, _graphene_info: ResolveInfo ) -> Sequence[GrapheneResourceRequirement]: - if self.is_source_asset(): + if self.is_source_or_defined_in_unexecutable_assets_def(): return [] node_def_snap = self.get_node_definition_snap() all_unique_keys = self.get_required_resource_keys(node_def_snap) @@ -1071,7 +1071,7 @@ def resolve_type( "GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType" ] ]: - if self.is_source_asset(): + if self.is_source_or_defined_in_unexecutable_assets_def(): return None external_pipeline = self.get_external_job() output_name = self.external_asset_node.output_name diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 40a507f544bd2..fdf14c6626d0c 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -916,6 +916,15 @@ def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionT self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) ) + def is_executable(self): + """Returns True if this definition represents unexecutable assets. + Assumption: either all or none contained assets are unexecutable. + """ + for key in self.keys: + if not self.is_asset_executable(key): + return False + return True + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index a929746b1f62e..5308f4ac92513 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -58,39 +58,51 @@ def is_base_asset_job_name(name: str) -> bool: def get_base_asset_jobs( - assets: Sequence[AssetsDefinition], + assets_defs: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset], asset_checks: Sequence[AssetChecksDefinition], resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( + # bucket the passed in AssetsDefinitions that are executable by partition. + # un-executable assets are intentionally omitted. + exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( defaultdict(list) ) - for assets_def in assets: - assets_by_partitions_def[assets_def.partitions_def].append(assets_def) + all_exe_assets = [] + all_source_assets = [*source_assets] + for assets_def in assets_defs: + if assets_def.is_executable(): + exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def) + all_exe_assets.append(assets_def) + else: + # treat all unexecutable assets as potential source assets + all_source_assets.extend(assets_def.to_source_assets()) # We need to create "empty" jobs for each partitions def that is used by an observable but no # materializable asset. They are empty because we don't assign the source asset to the `assets`, # but rather the `source_assets` argument of `build_assets_job`. for observable in [sa for sa in source_assets if sa.is_observable]: - if observable.partitions_def not in assets_by_partitions_def: - assets_by_partitions_def[observable.partitions_def] = [] - if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}: + if observable.partitions_def not in exe_assets_by_partitions_def: + exe_assets_by_partitions_def[observable.partitions_def] = [] + + if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == { + None + }: return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + assets=all_exe_assets, asset_checks=asset_checks, - source_assets=source_assets, + source_assets=all_source_assets, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_assets = assets_by_partitions_def.get(None, []) + unpartitioned_assets_defs = exe_assets_by_partitions_def.get(None, []) partitioned_assets_by_partitions_def = { - k: v for k, v in assets_by_partitions_def.items() if k is not None + k: v for k, v in exe_assets_by_partitions_def.items() if k is not None } jobs = [] @@ -101,8 +113,11 @@ def get_base_asset_jobs( jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + assets=[*assets_with_partitions, *unpartitioned_assets_defs], + source_assets=[ + *all_source_assets, + *all_exe_assets, + ], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index cca69953a8fb6..bab5c5d01ae17 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list( if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( - assets=assets_defs, + assets_defs=assets_defs, source_assets=source_assets, executor_def=default_executor_def, resource_defs=top_level_resources, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index ddc2aed0f5cc4..5de1a154170a7 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -10,6 +10,7 @@ from enum import Enum from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, Iterable, @@ -51,6 +52,7 @@ SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, AssetExecutionType, ) +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -1261,17 +1263,20 @@ def __new__( ), ) - @property - def is_executable(self) -> bool: + def _get_exe_type_metadata(self) -> Optional[str]: metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) if not metadata_value: - varietal_text = None + exe_type_text = None else: check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error assert isinstance(metadata_value, TextMetadataValue) # for type checker - varietal_text = metadata_value.value + exe_type_text = metadata_value.value - return AssetExecutionType.is_executable(varietal_text) + return exe_type_text + + @property + def is_executable(self) -> bool: + return AssetExecutionType.is_executable(self._get_exe_type_metadata()) ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] @@ -1342,6 +1347,11 @@ def external_repository_data_from_def( asset_graph = external_asset_nodes_from_defs( jobs, source_assets_by_key=repository_def.source_assets_by_key, + unexecutable_assets_defs={ + assets_def + for assets_def in repository_def.assets_defs_by_key.values() + if not assets_def.is_executable() + }, ) nested_resource_map = _get_nested_resources_map( @@ -1469,6 +1479,7 @@ def external_asset_checks_from_defs( def external_asset_nodes_from_defs( job_defs: Sequence[JobDefinition], source_assets_by_key: Mapping[AssetKey, SourceAsset], + unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]], ) -> Sequence[ExternalAssetNode]: node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = ( defaultdict(list) @@ -1488,6 +1499,7 @@ def external_asset_nodes_from_defs( descriptions_by_asset_key: Dict[AssetKey, str] = {} atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {} + # resolve details for assets in jobs for job_def in job_defs: asset_layer = job_def.asset_layer asset_info_by_node_output = asset_layer.asset_info_by_node_output_handle @@ -1546,10 +1558,22 @@ def external_asset_nodes_from_defs( group_name_by_asset_key.update(asset_layer.group_names_by_assets()) - asset_keys_without_definitions = all_upstream_asset_keys.difference( - node_defs_by_asset_key.keys() - ).difference(source_assets_by_key.keys()) - + # resolve details for unexecutable assets & their dependencies + unexecutable_keys = set() + for unexec_def in unexecutable_assets_defs or []: + for key in unexec_def.keys: + unexecutable_keys.add(key) + for depends_on in unexec_def.asset_deps[key]: + deps[key][depends_on] = ExternalAssetDependency(depends_on) + dep_by[depends_on][key] = ExternalAssetDependedBy(key) + + # build nodes for asset keys that were referred to in jobs but not defined elsewhere + # (handles SourceAsset usage by @ops) + asset_keys_without_definitions = ( + all_upstream_asset_keys.difference(node_defs_by_asset_key.keys()) + .difference(source_assets_by_key.keys()) + .difference(unexecutable_keys) + ) asset_nodes = [ ExternalAssetNode( asset_key=asset_key, @@ -1562,6 +1586,7 @@ def external_asset_nodes_from_defs( for asset_key in asset_keys_without_definitions ] + # build nodes for source assets for source_asset in source_assets_by_key.values(): if source_asset.key not in node_defs_by_asset_key: job_names = ( @@ -1603,6 +1628,7 @@ def external_asset_nodes_from_defs( ) ) + # build nodes for assets backed by ops/jobs for asset_key, node_tuple_list in node_defs_by_asset_key.items(): node_output_handle, job_def = node_tuple_list[0] @@ -1670,6 +1696,23 @@ def external_asset_nodes_from_defs( ) ) + # build nodes for unexecutable assets defs + for unexec_def in unexecutable_assets_defs or []: + for key in unexec_def.keys: + asset_nodes.append( + ExternalAssetNode( + asset_key=key, + dependencies=list(deps[key].values()), + depended_by=list(dep_by[key].values()), + metadata=unexec_def.metadata_by_key[key], + group_name=unexec_def.group_names_by_key[key], + is_source=False, + is_observable=False, + # could use a special kind tag for more UI punch + # compute_kind=... + ) + ) + defined = set() for node in asset_nodes: if node.asset_key in defined: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 6caedb2d75632..0ecd26f2aa4be 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -51,7 +51,9 @@ def repo(): return assets + (asset_checks or []) external_asset_nodes = external_asset_nodes_from_defs( - repo.get_all_jobs(), source_assets_by_key={} + repo.get_all_jobs(), + source_assets_by_key={}, + unexecutable_assets_defs=set(), ) return ExternalAssetGraph.from_repository_handles_and_external_asset_nodes( [(MagicMock(), asset_node) for asset_node in external_asset_nodes], diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 191d82c108420..8901fe59bf70a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1959,7 +1959,7 @@ def hourly_asset(): ... def unpartitioned_asset(): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ daily_asset, daily_asset2, daily_asset_different_start_date, @@ -2004,7 +2004,7 @@ def asset_b(): ... def asset_x(asset_b: B): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ asset_x, ], source_assets=[ diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index d540ebf82df0d..5357388323f77 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -659,7 +659,9 @@ def external_asset_graph_from_assets_by_repo_name( repo = Definitions(assets=assets).get_repository_def() external_asset_nodes = external_asset_nodes_from_defs( - repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key + repo.get_all_jobs(), + source_assets_by_key=repo.source_assets_by_key, + unexecutable_assets_defs=set(), ) repo_handle = MagicMock(repository_name=repo_name) from_repository_handles_and_external_asset_nodes.extend( diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index badd6c1131ee9..91fe19afca658 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -50,7 +50,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -75,7 +79,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -101,7 +109,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -145,7 +157,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -182,7 +198,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes[0].group_name == "group1" @@ -193,7 +213,11 @@ def asset1(): return 1 assets_job = build_assets_job("assets_job", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes[0].group_name == DEFAULT_GROUP_NAME @@ -222,7 +246,11 @@ def asset2(asset1): assert asset1 == 1 assets_job = build_assets_job("assets_job", [asset1, asset2]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -262,7 +290,11 @@ def something(result): pass assets_job = build_assets_job("assets_job", [something], source_assets=[not_result]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -313,7 +345,11 @@ def c2(c): asset_graph=AssetGraph.from_assets(all_assets) ) - external_asset_nodes = external_asset_nodes_from_defs([as_job, cs_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [as_job, cs_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert ( ExternalAssetNode( @@ -363,7 +399,11 @@ def asset2_b(asset1): assert asset1 == 1 assets_job = build_assets_job("assets_job", [asset1, asset2_a, asset2_b]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -423,7 +463,9 @@ def asset2(asset1): assets_job1 = build_assets_job("assets_job1", [asset1]) assets_job2 = build_assets_job("assets_job2", [asset2], source_assets=[asset1]) external_asset_nodes = external_asset_nodes_from_defs( - [assets_job1, assets_job2], source_assets_by_key={} + [assets_job1, assets_job2], + source_assets_by_key={}, + unexecutable_assets_defs=set(), ) assert external_asset_nodes == [ @@ -464,7 +506,11 @@ def asset1(): job1 = build_assets_job("job1", [asset1]) job2 = build_assets_job("job2", [asset1]) - external_asset_nodes = external_asset_nodes_from_defs([job1, job2], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [job1, job2], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -496,7 +542,11 @@ def assets(): assets_job = build_assets_job("assets_job", [assets]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) atomic_execution_unit_id = assets.unique_id @@ -550,7 +600,9 @@ def assets(in1, in2): all_assets_job = build_assets_job("assets_job", [in1, in2, assets, downstream]) external_asset_nodes = external_asset_nodes_from_defs( - [subset_job, all_assets_job], source_assets_by_key={} + [subset_job, all_assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), ) # sort so that test is deterministic sorted_nodes = sorted( @@ -684,7 +736,11 @@ def bar(foo): assets_job = build_assets_job("assets_job", [bar], source_assets=[foo]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( asset_key=AssetKey("foo"), @@ -715,7 +771,9 @@ def test_unused_source_asset(): bar = SourceAsset(key=AssetKey("bar"), description="def") external_asset_nodes = external_asset_nodes_from_defs( - [], source_assets_by_key={AssetKey("foo"): foo, AssetKey("bar"): bar} + [], + source_assets_by_key={AssetKey("foo"): foo, AssetKey("bar"): bar}, + unexecutable_assets_defs=set(), ) assert external_asset_nodes == [ ExternalAssetNode( @@ -749,7 +807,9 @@ def foo(bar): job1 = build_assets_job("job1", [foo], source_assets=[bar]) external_asset_nodes = external_asset_nodes_from_defs( - [job1], source_assets_by_key={AssetKey("bar"): bar} + [job1], + source_assets_by_key={AssetKey("bar"): bar}, + unexecutable_assets_defs=set(), ) assert external_asset_nodes == [ ExternalAssetNode( @@ -811,7 +871,11 @@ def zero(): assets_job = build_assets_job("assets_job", [zero, three_asset]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) # sort so that test is deterministic sorted_nodes = sorted( @@ -923,7 +987,11 @@ def create_twenty(thirteen, six): assets_job = build_assets_job("assets_job", [zero, eight_and_five, thirteen_and_six, twenty]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) # sort so that test is deterministic sorted_nodes = sorted( [ @@ -1010,7 +1078,11 @@ def asset2(asset1): del asset1 assets_job = build_assets_job("assets_job", [asset1, asset2]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes == [ ExternalAssetNode( @@ -1139,7 +1211,11 @@ def foo(): assets_job = build_assets_job("assets_job", [foo]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert external_asset_nodes[0].op_description == "bar" @@ -1163,7 +1239,11 @@ def foo(): external_asset_nodes = { asset_node.asset_key: asset_node - for asset_node in external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + for asset_node in external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) } assert external_asset_nodes[AssetKey("asset1")].op_description == "bar" assert external_asset_nodes[AssetKey("asset2")].op_description == "baz" @@ -1191,7 +1271,11 @@ def test_external_assets_def_to_external_asset_graph() -> None: asset_one = next(iter(external_assets_from_specs([AssetSpec("asset_one")]))) assets_job = build_assets_job("assets_job", [asset_one]) - external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) + external_asset_nodes = external_asset_nodes_from_defs( + [assets_job], + source_assets_by_key={}, + unexecutable_assets_defs=set(), + ) assert len(external_asset_nodes) == 1 assert next(iter(external_asset_nodes)).is_executable is False diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 2b4644deba243..f1cfa90c99ce0 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -23,7 +23,13 @@ external_assets_from_specs, ) from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.repository_definition.repository_definition import ( + RepositoryDefinition, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.host_representation.external_data import ( + external_repository_data_from_def, +) def test_external_asset_basic_creation() -> None: @@ -118,6 +124,18 @@ def test_external_asset_creation_with_deps() -> None: } +def _get_external_node_from_defs(defs: Definitions, asset_key: AssetKey): + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + matches = [] + for node in external_repo.external_asset_graph_data: + if node.asset_key == asset_key: + matches.append(node) + assert len(matches) == 1, f"Expected 1 match for {asset_key} in defs, got {len(matches)}" + return matches[0] + + def test_how_source_assets_are_backwards_compatible() -> None: class DummyIOManager(IOManager): def handle_output(self, context, obj) -> None: @@ -151,13 +169,31 @@ def an_asset(source_asset: str) -> str: result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( instance=instance, - # currently we have to explicitly select the asset to exclude the source from execution - asset_selection=[AssetKey("an_asset")], ) assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed" + # ensure bizarre behavior for selecting source asset works the same + source_seleciton_result = ( + defs_with_source.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, + asset_selection=[AssetKey("source_asset")], + ) + ) + assert source_seleciton_result.success + + shim_selection_result = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, + asset_selection=[AssetKey("source_asset")], + ) + assert shim_selection_result.success + + source_node = _get_external_node_from_defs(defs_with_source, AssetKey("source_asset")) + assert source_node.is_source + shim_node = _get_external_node_from_defs(defs_with_shim, AssetKey("source_asset")) + assert not shim_node.is_executable + def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition: job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs)) @@ -206,20 +242,16 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_one.success assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02" - - shimmed_source_asset = create_external_asset_from_source_asset(source_asset) defs_with_shim = Definitions( assets=[create_external_asset_from_source_asset(source_asset), an_asset] ) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) - job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset) + job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset) result_two = job_def_with_shim.execute_in_process( instance=instance, - # currently we have to explicitly select the asset to exclude the source from execution - asset_selection=[AssetKey("an_asset")], partition_key="2021-01-03", ) @@ -294,6 +326,40 @@ def test_external_assets_with_dependencies() -> None: defs = Definitions(assets=external_assets_from_specs([upstream_asset, downstream_asset])) assert defs - assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ - AssetKey("downstream_asset") - ] == {AssetKey("upstream_asset")} + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + nodes_by_key = {node.asset_key: node for node in external_repo.external_asset_graph_data} + assert ( + upstream_asset.key == nodes_by_key[downstream_asset.key].dependencies[0].upstream_asset_key + ) + + +def test_external_rep(): + table_a = AssetSpec("table_A") + table_b = AssetSpec("table_B", deps=[table_a]) + table_c = AssetSpec("table_C", deps=[table_a]) + table_d = AssetSpec("table_D", deps=[table_b, table_c]) + + those_assets = external_assets_from_specs(specs=[table_a, table_b, table_c, table_d]) + + defs = Definitions(assets=those_assets) + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + + assert len(external_repo.external_asset_graph_data) == 4 + + nodes_by_key = {node.asset_key: node for node in external_repo.external_asset_graph_data} + + assert len(nodes_by_key[table_a.key].depended_by) == 2 + assert len(nodes_by_key[table_a.key].dependencies) == 0 + + assert len(nodes_by_key[table_b.key].depended_by) == 1 + assert len(nodes_by_key[table_b.key].dependencies) == 1 + + assert len(nodes_by_key[table_c.key].depended_by) == 1 + assert len(nodes_by_key[table_c.key].dependencies) == 1 + + assert len(nodes_by_key[table_d.key].depended_by) == 0 + assert len(nodes_by_key[table_d.key].dependencies) == 2