Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Playground Example folder and Observable source asset test #17063

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/experimental/playground/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Playground

## Getting started

A place to collect lightweight examples of Dagster usage.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datetime import datetime

from dagster import Definitions, observable_source_asset
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset


@observable_source_asset
def always_return_time() -> DataVersion:
return DataVersion(datetime.now().strftime("%H:%M:%S"))


defs = Definitions(assets=[create_external_asset_from_source_asset(always_return_time)])
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import playground.observable_source_asset_adapter
from dagster import DagsterInstance


def test_observable_source_adapter() -> None:
job_def = playground.observable_source_asset_adapter.defs.get_implicit_global_asset_job_def()
instance = DagsterInstance.ephemeral()
assert job_def.execute_in_process(instance=instance).success
6 changes: 6 additions & 0 deletions examples/experimental/playground/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "playground"
2 changes: 2 additions & 0 deletions examples/experimental/playground/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = playground
17 changes: 17 additions & 0 deletions examples/experimental/playground/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from setuptools import find_packages, setup

setup(
name="playground",
packages=find_packages(exclude=["playground"]),
install_requires=["dagster"],
license="Apache-2.0",
description="A collection of examples to demonstrate Dagster features",
url="https://github.com/dagster-io/dagster/tree/master/examples/experimental/playground",
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
17 changes: 17 additions & 0 deletions examples/experimental/playground/tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[tox]
skipsdist = true

[testenv]
download = True
passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE*
deps =
-e ../../../python_modules/dagster[test]
-e ../../../python_modules/dagster-pipes
-e ../../../python_modules/dagster-webserver
-e ../../../python_modules/dagster-graphql
-e .
allowlist_externals =
/bin/bash
commands =
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest -c ../../../pyproject.toml -vv
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
39 changes: 27 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext

Expand Down Expand Up @@ -123,20 +126,23 @@ def _external_assets_def(context: AssetExecutionContext) -> None:


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -149,10 +155,19 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _external_assets_def() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
)

check.invariant(isinstance(_external_assets_def, AssetsDefinition))
assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright
check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright

return _external_assets_def
return _shim_assets_def
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
def fn(context: OpExecutionContext) -> None:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.external_asset import (
Expand Down Expand Up @@ -202,3 +204,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)])

instance = DagsterInstance.ephemeral()
result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)

assert result.success
assert result.output_for_node("an_observable_source_asset") is None

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1