diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index b85889416b23c..c93584215fc02 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -5,6 +5,7 @@ AssetKey, _check as check, ) +from dagster._core.definitions.asset_spec import AssetsDefCapability from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.data_version import ( NULL_DATA_VERSION, @@ -455,8 +456,8 @@ def get_required_resource_keys_rec( def is_graph_backed_asset(self) -> bool: return self.graphName is not None - def is_source_asset(self) -> bool: - return self._external_asset_node.is_source + def is_not_materialize_check_asset(self) -> bool: + return self._external_asset_node.capability_type != AssetsDefCapability.MATERIALIZE_CHECK def resolve_hasMaterializePermission( self, @@ -581,7 +582,7 @@ def resolve_assetObservations( ] def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]: - if self.is_source_asset(): + if self.is_not_materialize_check_asset(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -814,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline ] def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool: - return self.is_source_asset() + return self._external_asset_node.is_source def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.partitions_def_data is not None @@ -978,7 +979,7 @@ def resolve_metadata_entries( def resolve_op( self, _graphene_info: ResolveInfo ) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]: - if self.is_source_asset(): + if self.is_not_materialize_check_asset(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -1057,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository" def resolve_required_resources( self, _graphene_info: ResolveInfo ) -> Sequence[GrapheneResourceRequirement]: - if self.is_source_asset(): + if self.is_not_materialize_check_asset(): return [] node_def_snap = self.get_node_definition_snap() all_unique_keys = self.get_required_resource_keys(node_def_snap) @@ -1070,7 +1071,7 @@ def resolve_type( "GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType" ] ]: - if self.is_source_asset(): + if self.is_not_materialize_check_asset(): return None external_pipeline = self.get_external_job() output_name = self.external_asset_node.output_name diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 1b8e1cf85a26f..b422feca702bc 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -16,29 +16,40 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep -# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset +# This lives on the metadata of an asset # (which currently ends up on the Output associated with the asset key) -# whih encodes the execution type the of asset. "Unexecutable" assets are assets -# that cannot be materialized in Dagster, but can have events in the event -# log keyed off of them, making Dagster usable as a observability and lineage tool -# for externally materialized assets. -SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type" +# and encodes the capability type the of backing AssetsDefinition. +# Assets backed by AssetsDefinitions with no capabilities cannot be materialized in Dagster, +# but can have events in the event log keyed off of them, making Dagster usable as +# an observability and lineage tool for externally materialized assets. +SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY = "dagster/asset_def_capability" -class AssetExecutionType(Enum): - UNEXECUTABLE = "UNEXECUTABLE" - MATERIALIZATION = "MATERIALIZATION" +class AssetsDefCapability(Enum): + # default asset def capability, has backing op to execute that materializes/checks assets + MATERIALIZE_CHECK = "MATERIALIZE_CHECK" + + # asset def can facilitate IOManager loading of asset, but not execute + SOURCE = "SOURCE" + + # asset def can facilitate IOManager loading of asset, has backing op to observe data version + # OBSERVE_SOURCE = "OBSERVE_SOURCE" # what obs source asset support would look like + + # asset def only defines basic properties of asset, no further capabilities + NONE = "NONE" @staticmethod def is_executable(varietal_str: Optional[str]) -> bool: - return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION} + return AssetsDefCapability.str_to_enum(varietal_str) in { + AssetsDefCapability.MATERIALIZE_CHECK + } @staticmethod - def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType": + def str_to_enum(cap_str: Optional[str]) -> "AssetsDefCapability": return ( - AssetExecutionType.MATERIALIZATION - if varietal_str is None - else AssetExecutionType(varietal_str) + AssetsDefCapability.MATERIALIZE_CHECK + if cap_str is None + else AssetsDefCapability(cap_str) ) diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index a0bc0d0d4f0f3..74222c8887e7e 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -868,14 +868,36 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool: bool: True if the asset key is materializable by this AssetsDefinition. """ from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, + SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY, + AssetsDefCapability, ) - return AssetExecutionType.is_executable( - self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) + return AssetsDefCapability.is_executable( + self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY) ) + def is_executable(self): + """Returns True if this definition represents unexecutable assets. + Assumption: either all or none contained assets are unexecutable. + """ + for key in self.keys: + if not self.is_asset_executable(key): + return False + return True + + @property + def capability_type(self): + from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY, + AssetsDefCapability, + ) + + # assumes capability is stored as metadata is same across all assets + for key in self.keys: + return AssetsDefCapability.str_to_enum( + self._metadata_by_key.get(key, {}).get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY) + ) + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index cf186f3d2f6a9..6149b1e14ea2a 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -13,6 +13,7 @@ Tuple, Union, ) +from dagster._core.definitions.asset_spec import AssetsDefCapability from toposort import CircularDependencyError, toposort @@ -58,39 +59,48 @@ def is_base_asset_job_name(name: str) -> bool: def get_base_asset_jobs( - assets: Sequence[AssetsDefinition], + assets_defs: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset], asset_checks: Sequence[AssetChecksDefinition], resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( + # bucket the passed in AssetsDefinitions that are executable by partition. + # un-executable assets are intentionally omitted. + exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( defaultdict(list) ) - for assets_def in assets: - assets_by_partitions_def[assets_def.partitions_def].append(assets_def) + all_source_assets = [*source_assets] + for assets_def in assets_defs: + if assets_def.is_executable(): + exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def) + elif assets_def.capability_type == AssetsDefCapability.SOURCE: + all_source_assets.extend(assets_def.to_source_assets()) # We need to create "empty" jobs for each partitions def that is used by an observable but no # materializable asset. They are empty because we don't assign the source asset to the `assets`, # but rather the `source_assets` argument of `build_assets_job`. for observable in [sa for sa in source_assets if sa.is_observable]: - if observable.partitions_def not in assets_by_partitions_def: - assets_by_partitions_def[observable.partitions_def] = [] - if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}: + if observable.partitions_def not in exe_assets_by_partitions_def: + exe_assets_by_partitions_def[observable.partitions_def] = [] + + if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == { + None + }: return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + assets=exe_assets_by_partitions_def.get(None, []), asset_checks=asset_checks, - source_assets=source_assets, + source_assets=all_source_assets, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_assets = assets_by_partitions_def.get(None, []) + unpartitioned_assets = exe_assets_by_partitions_def.get(None, []) partitioned_assets_by_partitions_def = { - k: v for k, v in assets_by_partitions_def.items() if k is not None + k: v for k, v in exe_assets_by_partitions_def.items() if k is not None } jobs = [] @@ -102,7 +112,10 @@ def get_base_asset_jobs( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + source_assets=[ + *all_source_assets, + *assets_defs, # should this be *unpartitioned_assets ? + ], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index 664c3d6beb5ef..0389e5d24c3da 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -2,8 +2,8 @@ from dagster import _check as check from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, + SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY, + AssetsDefCapability, AssetSpec, ) from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset @@ -34,18 +34,14 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): group_name=spec.group_name, metadata={ **(spec.metadata or {}), - **{ - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( - AssetExecutionType.UNEXECUTABLE.value - ) - }, + **{SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY: AssetsDefCapability.NONE.value}, }, deps=spec.deps, ) ) @multi_asset(specs=new_specs) - def an_asset() -> None: + def an_asset(): raise DagsterInvariantViolationError( f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}" ) @@ -67,7 +63,7 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So "key": source_asset.key, "metadata": { **source_asset.metadata, - **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + **{SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY: AssetsDefCapability.SOURCE.value}, }, "group_name": source_asset.group_name, "description": source_asset.description, diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index cca69953a8fb6..bab5c5d01ae17 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list( if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( - assets=assets_defs, + assets_defs=assets_defs, source_assets=source_assets, executor_def=default_executor_def, resource_defs=top_level_resources, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index edea50f20c70e..6204bd9dd04f0 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -10,6 +10,7 @@ from enum import Enum from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, Iterable, @@ -48,9 +49,10 @@ from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, + SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY, + AssetsDefCapability, ) +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -1260,17 +1262,24 @@ def __new__( ), ) - @property - def is_executable(self) -> bool: - metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) + def _get_capability_metadata(self) -> Optional[str]: + metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_DEF_CAPABILITY) if not metadata_value: - varietal_text = None + cap_text = None else: check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error assert isinstance(metadata_value, TextMetadataValue) # for type checker - varietal_text = metadata_value.value + cap_text = metadata_value.value - return AssetExecutionType.is_executable(varietal_text) + return cap_text + + @property + def is_executable(self) -> bool: + return AssetsDefCapability.is_executable(self._get_capability_metadata()) + + @property + def capability_type(self) -> AssetsDefCapability: + return AssetsDefCapability.str_to_enum(self._get_capability_metadata()) ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] @@ -1341,6 +1350,11 @@ def external_repository_data_from_def( asset_graph = external_asset_graph_from_defs( jobs, source_assets_by_key=repository_def.source_assets_by_key, + unexecutable_assets_defs={ + assets_def + for assets_def in repository_def.assets_defs_by_key.values() + if not assets_def.is_executable() + }, ) nested_resource_map = _get_nested_resources_map( @@ -1462,6 +1476,7 @@ def external_asset_checks_from_defs( def external_asset_graph_from_defs( job_defs: Sequence[JobDefinition], source_assets_by_key: Mapping[AssetKey, SourceAsset], + unexecutable_assets_defs: Optional[AbstractSet[AssetsDefinition]] = None, # drop default value ) -> Sequence[ExternalAssetNode]: node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = ( defaultdict(list) @@ -1659,6 +1674,30 @@ def external_asset_graph_from_defs( ) ) + # resolve deps unexec assets since they are not encoded in jobs + for unexec_def in unexecutable_assets_defs or []: + for key in unexec_def.keys: + for depends_on in unexec_def.asset_deps[key]: + deps[key][depends_on] = ExternalAssetDependency(depends_on) + dep_by[depends_on][key] = ExternalAssetDependedBy(key) + + # build nodes for unexec assets + for unexec_def in unexecutable_assets_defs or []: + for key in unexec_def.keys: + asset_nodes.append( + ExternalAssetNode( + asset_key=key, + dependencies=list(deps[key].values()), + depended_by=list(dep_by[key].values()), + metadata=unexec_def.metadata_by_key[key], + group_name=unexec_def.group_names_by_key[key], + is_source=unexec_def.capability_type == AssetsDefCapability.SOURCE, + is_observable=False, + # could use a special kind tag for more UI punch + # compute_kind=... + ) + ) + return asset_nodes diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 191d82c108420..8901fe59bf70a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1959,7 +1959,7 @@ def hourly_asset(): ... def unpartitioned_asset(): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ daily_asset, daily_asset2, daily_asset_different_start_date, @@ -2004,7 +2004,7 @@ def asset_b(): ... def asset_x(asset_b: B): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ asset_x, ], source_assets=[ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 65d5c99ebc4f3..ce3553b5c14a7 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -20,7 +20,14 @@ create_unexecutable_observable_assets_def, create_unexecutable_observable_assets_def_from_source_asset, ) +from dagster._core.definitions.repository_definition.repository_definition import ( + RepositoryDefinition, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.errors import DagsterInvalidSubsetError +from dagster._core.host_representation.external_data import ( + external_repository_data_from_def, +) def test_observable_asset_basic_creation() -> None: @@ -86,6 +93,17 @@ def test_observable_asset_creation_with_deps() -> None: } +def _get_external_node_from_defs(defs: Definitions, asset_key: AssetKey): + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + for node in external_repo.external_asset_graph_data: + if node.asset_key == asset_key: + return node + + assert False, f"Could not find {asset_key} in defs" + + def test_how_source_assets_are_backwards_compatible() -> None: class DummyIOManager(IOManager): def handle_output(self, context, obj) -> None: @@ -119,13 +137,16 @@ def an_asset(source_asset: str) -> str: result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( instance=instance, - # currently we have to explicitly select the asset to exclude the source from execution - asset_selection=[AssetKey("an_asset")], ) assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed" + # ensure same x-process repr + source_node = _get_external_node_from_defs(defs_with_source, AssetKey("source_asset")) + shim_node = _get_external_node_from_defs(defs_with_shim, AssetKey("source_asset")) + assert source_node == shim_node + def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition: job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs)) @@ -175,21 +196,76 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_one.success assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02" - shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset) + create_unexecutable_observable_assets_def_from_source_asset(source_asset) defs_with_shim = Definitions( assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] ) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) - job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset) + job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset) result_two = job_def_with_shim.execute_in_process( instance=instance, - # currently we have to explicitly select the asset to exclude the source from execution - asset_selection=[AssetKey("an_asset")], partition_key="2021-01-03", ) assert result_two.success assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03" + + +def test_non_executable_asset_excluded_from_job() -> None: + upstream_asset = create_unexecutable_observable_assets_def( + specs=[AssetSpec("upstream_asset")], + ) + + @asset(deps=[upstream_asset]) + def downstream_asset() -> None: ... + + defs = Definitions(assets=[upstream_asset, downstream_asset]) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + + # ensure that explict selection fails + with pytest.raises( + DagsterInvalidSubsetError, + match=( + r'Assets provided in asset_selection argument \["upstream_asset"\] do not exist in' + r" parent asset group or job" + ), + ): + defs.get_implicit_global_asset_job_def().execute_in_process( + asset_selection=[AssetKey("upstream_asset")] + ) + + +def test_external_rep(): + table_a = AssetSpec("table_A") + table_b = AssetSpec("table_B", deps=[table_a]) + table_c = AssetSpec("table_C", deps=[table_a]) + table_d = AssetSpec("table_D", deps=[table_b, table_c]) + + those_assets = create_unexecutable_observable_assets_def( + specs=[table_a, table_b, table_c, table_d] + ) + + defs = Definitions(assets=[those_assets]) + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + + assert len(external_repo.external_asset_graph_data) == 4 + + nodes_by_key = {node.asset_key: node for node in external_repo.external_asset_graph_data} + + assert len(nodes_by_key[table_a.key].depended_by) == 2 + assert len(nodes_by_key[table_a.key].dependencies) == 0 + + assert len(nodes_by_key[table_b.key].depended_by) == 1 + assert len(nodes_by_key[table_b.key].dependencies) == 1 + + assert len(nodes_by_key[table_c.key].depended_by) == 1 + assert len(nodes_by_key[table_c.key].dependencies) == 1 + + assert len(nodes_by_key[table_d.key].depended_by) == 0 + assert len(nodes_by_key[table_d.key].dependencies) == 2