Skip to content

Commit

Permalink
yield MaterializeResult
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 20, 2023
1 parent 78ebf91 commit 8830b33
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 78 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ required-version = "0.0.289"

[tool.ruff.flake8-builtins]

# We use `id` in many places and almost never want to use the python builtin.
builtins-ignorelist = ["id"]
# Id and type are frequently helpful as local variable or parameter names.
builtins-ignorelist = ["id", "type"]

[tool.ruff.flake8-tidy-imports.banned-api]

Expand Down
82 changes: 75 additions & 7 deletions python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Generic,
Iterator,
Literal,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Type,
TypedDict,
TypeVar,
Union,
cast,
get_args,
)
Expand Down Expand Up @@ -98,6 +101,14 @@ class ExtDataProvenance(TypedDict):
is_user_provided: bool


ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None]


class ExtMetadataValue(TypedDict):
type: Optional["ExtMetadataType"]
raw_value: ExtMetadataRawValue


ExtMetadataType = Literal[
"text",
"url",
Expand Down Expand Up @@ -148,7 +159,10 @@ def _assert_single_asset(data: ExtContextData, key: str) -> None:


def _resolve_optionally_passed_asset_key(
data: ExtContextData, asset_key: Optional[str], method: str
data: ExtContextData,
asset_key: Optional[str],
method: str,
already_materialized_assets: Set[str],
) -> str:
asset_keys = _assert_defined_asset_property(data["asset_keys"], method)
asset_key = _assert_opt_param_type(asset_key, str, method, "asset_key")
Expand All @@ -163,6 +177,11 @@ def _resolve_optionally_passed_asset_key(
" targets multiple assets."
)
asset_key = asset_keys[0]
if asset_key in already_materialized_assets:
raise DagsterExtError(
f"Calling `{method}` with asset key `{asset_key}` is undefined. Asset has already been"
" materialized, so no additional data can be reported for it."
)
return asset_key


Expand Down Expand Up @@ -259,6 +278,30 @@ def _assert_param_json_serializable(value: _T, method: str, param: str) -> _T:
return value


def _normalize_param_metadata(
metadata: Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]], method: str, param: str
) -> Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]:
_assert_param_type(metadata, dict, method, param)
new_metadata: Dict[str, ExtMetadataValue] = {}
for key, value in metadata.items():
if not isinstance(key, str):
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with string"
f" keys, got a key `{key}` of type `{type(key)}`."
)
elif isinstance(value, dict):
if not {*value.keys()} == {*ExtMetadataValue.__annotations__.keys()}:
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with"
" string keys and values that are either raw metadata values or dictionaries"
f" with schema `{{raw_value: ..., type: ...}}`. Got a value `{value}`."
)
new_metadata[key] = cast(ExtMetadataValue, value)
else:
new_metadata[key] = {"raw_value": value, "type": None}
return new_metadata


def _param_from_env_var(key: str) -> Any:
raw_value = os.environ.get(_param_name_to_env_var(key))
return decode_env_var(raw_value) if raw_value is not None else None
Expand Down Expand Up @@ -625,6 +668,7 @@ def __init__(
) -> None:
self._data = data
self.message_channel = message_channel
self.materialized_assets: Set[str] = set()

def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None:
message = ExtMessage(method=method, params=params)
Expand Down Expand Up @@ -730,26 +774,27 @@ def extras(self) -> Mapping[str, Any]:
def report_asset_metadata(
self,
label: str,
value: Any,
value: ExtMetadataRawValue,
metadata_type: Optional[ExtMetadataType] = None,
asset_key: Optional[str] = None,
) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_metadata"
self._data, asset_key, "report_asset_metadata", self.materialized_assets
)
label = _assert_param_type(label, str, "report_asset_metadata", "label")
value = _assert_param_json_serializable(value, "report_asset_metadata", "value")
metadata_type = _assert_opt_param_value(
metadata_type, get_args(ExtMetadataType), "report_asset_metadata", "type"
type = _assert_opt_param_value(
metadata_type, get_args(ExtMetadataType), "report_asset_metadata", "metadata_type"
)

self._write_message(
"report_asset_metadata",
{"asset_key": asset_key, "label": label, "value": value, "type": metadata_type},
{"asset_key": asset_key, "label": label, "value": {"raw_value": value, "type": type}},
)

def report_asset_data_version(self, data_version: str, asset_key: Optional[str] = None) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_data_version"
self._data, asset_key, "report_asset_data_version", self.materialized_assets
)
data_version = _assert_param_type(
data_version, str, "report_asset_data_version", "data_version"
Expand All @@ -758,6 +803,29 @@ def report_asset_data_version(self, data_version: str, asset_key: Optional[str]
"report_asset_data_version", {"asset_key": asset_key, "data_version": data_version}
)

def report_asset_materialization(
self,
metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None,
data_version: Optional[str] = None,
asset_key: Optional[str] = None,
):
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_materialization", self.materialized_assets
)
metadata = (
_normalize_param_metadata(metadata, "report_asset_check_result", "metadata")
if metadata
else None
)
data_version = _assert_opt_param_type(
data_version, str, "report_asset_data_version", "data_version"
)
self._write_message(
"report_asset_materialization",
{"asset_key": asset_key, "data_version": data_version, "metadata": metadata},
)
self.materialized_assets.add(asset_key)

def log(self, message: str, level: str = "info") -> None:
message = _assert_param_type(message, str, "log", "asset_key")
level = _assert_param_value(level, ["info", "warning", "error"], "log", "level")
Expand Down
7 changes: 7 additions & 0 deletions python_modules/dagster-ext/dagster_ext_tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,10 @@ def test_extras_context():
assert context.get_extra("foo") == "bar"
with pytest.raises(DagsterExtError, match="Extra `bar` is undefined"):
context.get_extra("bar")


def test_report_after_materialization():
context = _make_external_execution_context(asset_keys=["foo"])
with pytest.raises(DagsterExtError, match="already been materialized"):
context.report_asset_materialization(asset_key="foo")
context.report_asset_data_version("alpha", asset_key="foo")
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

import boto3
import pytest
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata import (
Expand Down Expand Up @@ -148,7 +149,7 @@ def test_ext_subprocess(
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]
return ext.run(
yield from ext.run(
cmd,
context=context,
extras=extras,
Expand All @@ -161,11 +162,7 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess):
resource = ExtSubprocess(context_injector=context_injector, message_reader=message_reader)

with instance_for_test() as instance:
materialize(
[foo],
instance=instance,
resources={"ext": resource},
)
materialize([foo], instance=instance, resources={"ext": resource})
mat = instance.get_latest_materialization_event(foo.key)
assert mat and mat.asset_materialization
assert isinstance(mat.asset_materialization.metadata["bar"], MarkdownMetadataValue)
Expand All @@ -178,6 +175,35 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess):
assert re.search(r"dagster - INFO - [^\n]+ - hello world\n", captured.err, re.MULTILINE)


def test_ext_multi_asset():
def script_fn():
from dagster_ext import init_dagster_ext

context = init_dagster_ext()
context.report_asset_materialization(
{"foo_meta": "ok"}, data_version="alpha", asset_key="foo"
)
context.report_asset_data_version("alpha", asset_key="bar")

@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")])
def foo_bar(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
yield from ext.run(cmd, context=context)

with instance_for_test() as instance:
materialize([foo_bar], instance=instance, resources={"ext": ExtSubprocess()})
foo_mat = instance.get_latest_materialization_event(AssetKey(["foo"]))
assert foo_mat and foo_mat.asset_materialization
assert foo_mat.asset_materialization.metadata["foo_meta"].value == "ok"
assert foo_mat.asset_materialization.tags
assert foo_mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
bar_mat = instance.get_latest_materialization_event(AssetKey(["foo"]))
assert bar_mat and bar_mat.asset_materialization
assert bar_mat.asset_materialization.tags
assert bar_mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"


def test_ext_typed_metadata():
def script_fn():
from dagster_ext import init_dagster_ext
Expand All @@ -201,7 +227,7 @@ def script_fn():
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
return ext.run(cmd, context=context)
yield from ext.run(cmd, context=context)

with instance_for_test() as instance:
materialize(
Expand Down Expand Up @@ -248,7 +274,7 @@ def script_fn():
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
ext.run(cmd, context=context)
yield from ext.run(cmd, context=context)

with pytest.raises(DagsterExternalExecutionError):
materialize([foo], resources={"ext": ExtSubprocess()})
Expand Down Expand Up @@ -314,8 +340,7 @@ def subproc_run(context: AssetExecutionContext):
) as ext_context:
subprocess.run(cmd, env=ext_context.get_external_process_env_vars(), check=False)
_ext_context = ext_context
mat_results = _ext_context.get_materialize_results()
return mat_results[0] if len(mat_results) == 1 else mat_results
yield from _ext_context.get_materialize_results()

with instance_for_test() as instance:
materialize(
Expand All @@ -328,30 +353,3 @@ def subproc_run(context: AssetExecutionContext):
assert mat.asset_materialization.tags
assert mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
assert mat.asset_materialization.tags[DATA_VERSION_IS_USER_PROVIDED_TAG]


def test_ext_no_client_premature_get_results(external_script):
@asset
def subproc_run(context: AssetExecutionContext):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]

with ext_protocol(
context,
ExtTempFileContextInjector(),
ExtTempFileMessageReader(),
extras=extras,
) as ext_context:
subprocess.run(cmd, env=ext_context.get_external_process_env_vars(), check=False)
return ext_context.get_materialize_results()

with pytest.raises(
DagsterExternalExecutionError,
match=(
"`get_materialize_results` must be called after the `ext_protocol` context manager has"
" exited."
),
):
materialize(
[subproc_run],
)
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class TableColumn(
def __new__(
cls,
name: str,
type: str = "string", # noqa: A002
type: str = "string",
description: Optional[str] = None,
constraints: Optional["TableColumnConstraints"] = None,
):
Expand Down
Loading

0 comments on commit 8830b33

Please sign in to comment.