Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Asset -> Unexecutable Asset Adapter #16617

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.execution.with_resources import with_resources
from dagster._core.executor.base import Executor
Expand Down Expand Up @@ -540,6 +541,14 @@ def get_implicit_job_def_for_assets(
) -> Optional[JobDefinition]:
return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys)

def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition:
asset_key = AssetKey.from_coercible(key)
for assets_def in self.get_asset_graph().assets:
if asset_key in assets_def.keys:
return assets_def

raise DagsterInvariantViolationError(f"Could not find asset {asset_key}")

@cached_method
def get_repository_def(self) -> RepositoryDefinition:
"""Definitions is implemented by wrapping RepositoryDefinition. Get that underlying object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvariantViolationError


Expand Down Expand Up @@ -50,3 +51,36 @@ def an_asset() -> None:
)

return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
},
Comment on lines +68 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well need to ensure is_source get toggled on the ExternalAssetNode somehow, maybe other metadata or another update to the execution type enum

"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

return shim_asset
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
from typing import AbstractSet, Iterable

import pytest
from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.observable_asset import (
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -72,3 +84,112 @@ def test_observable_asset_creation_with_deps() -> None:
assert assets_def.asset_deps[expected_key] == {
AssetKey(["observable_asset_two"]),
}


def test_how_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
pass

def load_input(self, context) -> str:
return "hardcoded"

source_asset = SourceAsset(key="source_asset", io_manager_def=DummyIOManager())

@asset
def an_asset(source_asset: str) -> str:
return source_asset + "-computed"

defs_with_source = Definitions(assets=[source_asset, an_asset])

instance = DagsterInstance.ephemeral()

result_one = defs_with_source.get_implicit_global_asset_job_def().execute_in_process(
instance=instance
)

assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process(
instance=instance,
# currently we have to explicitly select the asset to exclude the source from execution
asset_selection=[AssetKey("an_asset")],
)

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed"


def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition:
job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs))
assert job_def, "Expected to find a job def"
return job_def


def set_from_coercibles_or_defs(coercibles_or_defs: Iterable) -> AbstractSet["AssetKey"]:
return set(
[
AssetKey.from_coercible_or_definition(coercible_or_def)
for coercible_or_def in coercibles_or_defs
]
)


def test_how_partitioned_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
pass

def load_input(self, context) -> str:
return "hardcoded"

partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")
source_asset = SourceAsset(
key="source_asset", io_manager_def=DummyIOManager(), partitions_def=partitions_def
)

@asset(partitions_def=partitions_def)
def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
return source_asset + "-computed-" + context.partition_key

assert an_asset.partitions_def is partitions_def
assert source_asset.partitions_def is partitions_def

defs_with_source = Definitions(assets=[source_asset, an_asset])

instance = DagsterInstance.ephemeral()

job_def_without_shim = get_job_for_assets(defs_with_source, an_asset)

result_one = job_def_without_shim.execute_in_process(
instance=instance, partition_key="2021-01-02"
)

assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset)

result_two = job_def_with_shim.execute_in_process(
instance=instance,
# currently we have to explicitly select the asset to exclude the source from execution
asset_selection=[AssetKey("an_asset")],
partition_key="2021-01-03",
)

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"