Skip to content

Commit

Permalink
Source Asset -> Unexecutable Asset Adapter (#16617)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Allow us to wrap an unexecutable source asset in a vanilla assets definition.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Sep 20, 2023
1 parent 48db04c commit f58f789
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.execution.with_resources import with_resources
from dagster._core.executor.base import Executor
Expand Down Expand Up @@ -540,6 +541,14 @@ def get_implicit_job_def_for_assets(
) -> Optional[JobDefinition]:
return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys)

def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition:
asset_key = AssetKey.from_coercible(key)
for assets_def in self.get_asset_graph().assets:
if asset_key in assets_def.keys:
return assets_def

raise DagsterInvariantViolationError(f"Could not find asset {asset_key}")

@cached_method
def get_repository_def(self) -> RepositoryDefinition:
"""Definitions is implemented by wrapping RepositoryDefinition. Get that underlying object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvariantViolationError


Expand Down Expand Up @@ -50,3 +51,36 @@ def an_asset() -> None:
)

return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
},
"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

return shim_asset
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
from typing import AbstractSet, Iterable

import pytest
from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.observable_asset import (
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -72,3 +84,112 @@ def test_observable_asset_creation_with_deps() -> None:
assert assets_def.asset_deps[expected_key] == {
AssetKey(["observable_asset_two"]),
}


def test_how_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
pass

def load_input(self, context) -> str:
return "hardcoded"

source_asset = SourceAsset(key="source_asset", io_manager_def=DummyIOManager())

@asset
def an_asset(source_asset: str) -> str:
return source_asset + "-computed"

defs_with_source = Definitions(assets=[source_asset, an_asset])

instance = DagsterInstance.ephemeral()

result_one = defs_with_source.get_implicit_global_asset_job_def().execute_in_process(
instance=instance
)

assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process(
instance=instance,
# currently we have to explicitly select the asset to exclude the source from execution
asset_selection=[AssetKey("an_asset")],
)

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed"


def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition:
job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs))
assert job_def, "Expected to find a job def"
return job_def


def set_from_coercibles_or_defs(coercibles_or_defs: Iterable) -> AbstractSet["AssetKey"]:
return set(
[
AssetKey.from_coercible_or_definition(coercible_or_def)
for coercible_or_def in coercibles_or_defs
]
)


def test_how_partitioned_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
pass

def load_input(self, context) -> str:
return "hardcoded"

partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")
source_asset = SourceAsset(
key="source_asset", io_manager_def=DummyIOManager(), partitions_def=partitions_def
)

@asset(partitions_def=partitions_def)
def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
return source_asset + "-computed-" + context.partition_key

assert an_asset.partitions_def is partitions_def
assert source_asset.partitions_def is partitions_def

defs_with_source = Definitions(assets=[source_asset, an_asset])

instance = DagsterInstance.ephemeral()

job_def_without_shim = get_job_for_assets(defs_with_source, an_asset)

result_one = job_def_without_shim.execute_in_process(
instance=instance, partition_key="2021-01-02"
)

assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset)

result_two = job_def_with_shim.execute_in_process(
instance=instance,
# currently we have to explicitly select the asset to exclude the source from execution
asset_selection=[AssetKey("an_asset")],
partition_key="2021-01-03",
)

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"

0 comments on commit f58f789

Please sign in to comment.