Skip to content

Commit

Permalink
rename create_unexecutable_observable_assets_def to create_external_a…
Browse files Browse the repository at this point in the history
…ssets_def_from_specs
  • Loading branch information
schrockn committed Sep 25, 2023
1 parent 095f959 commit 4dc593c
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.observable_asset import create_external_assets_def_from_specs
from dagster._core.definitions.partition import PartitionedConfig
from dagster._core.definitions.reconstruct import ReconstructableRepository
from dagster._core.definitions.sensor_definition import RunRequest, SkipReason
Expand Down Expand Up @@ -1382,7 +1382,7 @@ def executable_asset() -> None:
pass


unexecutable_asset = create_unexecutable_observable_assets_def([AssetSpec("unexecutable_asset")])
unexecutable_asset = create_external_assets_def_from_specs([AssetSpec("unexecutable_asset")])

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvariantViolationError


def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
def create_external_assets_def_from_specs(specs: Sequence[AssetSpec]) -> AssetsDefinition:
new_specs = []
for spec in specs:
check.invariant(
Expand Down Expand Up @@ -45,15 +46,15 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def _external_assets_def() -> None:
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)

return an_asset
return _external_assets_def


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
def create_external_assets_def_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
Expand All @@ -80,7 +81,10 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def shim_asset() -> None:
def _external_assets_def() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

return shim_asset
check.invariant(isinstance(_external_assets_def, AssetsDefinition))
assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright

return _external_assets_def
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.observable_asset import create_external_assets_def_from_specs
from dagster._core.definitions.partition import ScheduleType
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME
Expand Down Expand Up @@ -1188,7 +1188,7 @@ def test_external_time_window_valid_partition_key():


def test_unexecutable_external_asset_node() -> None:
asset_one = create_unexecutable_observable_assets_def([AssetSpec("asset_one")])
asset_one = create_external_assets_def_from_specs([AssetSpec("asset_one")])

assets_job = build_assets_job("assets_job", [asset_one])
external_asset_nodes = external_asset_graph_from_defs([assets_job], source_assets_by_key={})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import (
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
create_external_assets_def_from_source_asset,
create_external_assets_def_from_specs,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition


def test_observable_asset_basic_creation() -> None:
assets_def = create_unexecutable_observable_assets_def(
assets_def = create_external_assets_def_from_specs(
specs=[
AssetSpec(
key="observable_asset_one",
Expand Down Expand Up @@ -56,7 +56,7 @@ def test_invalid_observable_asset_creation() -> None:

for invalid_spec in invalid_specs:
with pytest.raises(check.CheckError):
create_unexecutable_observable_assets_def(specs=[invalid_spec])
create_external_assets_def_from_specs(specs=[invalid_spec])


def test_normal_asset_materializeable() -> None:
Expand All @@ -68,7 +68,7 @@ def an_asset() -> None: ...

def test_observable_asset_creation_with_deps() -> None:
asset_two = AssetSpec("observable_asset_two")
assets_def = create_unexecutable_observable_assets_def(
assets_def = create_external_assets_def_from_specs(
specs=[
AssetSpec(
"observable_asset_one",
Expand Down Expand Up @@ -112,7 +112,7 @@ def an_asset(source_asset: str) -> str:
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_external_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand Down Expand Up @@ -175,9 +175,9 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset)
shimmed_source_asset = create_external_assets_def_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_external_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand Down

0 comments on commit 4dc593c

Please sign in to comment.