Skip to content

Commit

Permalink
Wrap observable source assets in AssetsDefinition (#16620)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This makes an observable source asset wrappable in an AssetsDefinition.
This is an imperfect intermediate state, as as spurious asset
materialization is produced as a result of execution (verified in test
case but fixed in #16621) but
this verifies that the core execution machinery works.

Makes the following code possible:

```python
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
    return DataVersion("foo")

defs = Definitions(assets=[
    create_external_asset_from_source_asset(an_observable_source_asset)
])
```

This `@observable_source_asset` now plugs into the rest of our
infrastructure:

1) You can execute it from `dagster-webserver` using the "Materialize"
button, which will be renamed to "Execute".
2) You can schedule this from schedules and sensors.

AMP support has to wait because auto observation is hard-coded in the
AMP logic.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Oct 6, 2023
1 parent 1c5d825 commit 0ead82b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 14 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
39 changes: 27 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext

Expand Down Expand Up @@ -123,20 +126,23 @@ def _external_assets_def(context: AssetExecutionContext) -> None:


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -149,10 +155,19 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _external_assets_def() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
)

check.invariant(isinstance(_external_assets_def, AssetsDefinition))
assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright
check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright

return _external_assets_def
return _shim_assets_def
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
def fn(context: OpExecutionContext) -> None:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.external_asset import (
Expand Down Expand Up @@ -202,3 +204,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)])

instance = DagsterInstance.ephemeral()
result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)

assert result.success
assert result.output_for_node("an_observable_source_asset") is None

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1

0 comments on commit 0ead82b

Please sign in to comment.